
    >                        S r SSKJr  SSKJr  SSKJr  SSKJr  SSKrSSKrSSKrSSK	r	SSK
r
SSKrSSKJr  SSKJr  SS	KJr   SSKrS
rSrSrSrSrSqSqSq\R:                  " SSS/5      r \	R>                  " S5      r  " S S\#5      r$ " S S\#5      r%S r&S r'SS jr(S r)\4S jr*g! \ a  rSr SrCNnSrCff = f! \!\"4 a    \	r  NLf = f)z:Utility classes and methods for the parallelism framework.    )absolute_import)print_function)division)unicode_literalsN)	constants)system_util)queueTF<      )zThere were noztasks to do MultiprocessingIsAvailableResultis_availablestack_traceforkc                   N    \ rS rSrSrSS jrS rS rSS jrS r	S	 r
SS
 jrSrg)
AtomicDictN   zThread-safe (and optionally process-safe) dictionary protected by a lock.

If a multiprocessing.Manager is supplied on init, the dictionary is
both process and thread safe. Otherwise, it is only thread-safe.
Nc                     U(       a+  UR                  5       U l        UR                  5       U l        g[        R                   " 5       U l        0 U l        g)zInitializes the dict.

Args:
  manager: (multiprocessing.Manager or None) Manager instance (required for
      cross-process safety), or none if cross-process safety is not needed.
N)Locklockdict	threading)selfmanagers     9platform/gsutil/gslib/utils/parallelism_framework_util.py__init__AtomicDict.__init__U   s6     ,,.di,,.di.."didi    c                 n    U R                      U R                  U   sS S S 5        $ ! , (       d  f       g = fNr   r   r   keys     r   __getitem__AtomicDict.__getitem__c   s    	YYs^ 
s   &
4c                 l    U R                      X R                  U'   S S S 5        g ! , (       d  f       g = fr   r    )r   r"   values      r   __setitem__AtomicDict.__setitem__g   s    	iin 
s   %
3c                     U R                      U R                  R                  X5      sS S S 5        $ ! , (       d  f       g = fr   r   r   get)r   r"   default_values      r   r+   AtomicDict.getl   s#    	YY]]3. 
s	   2
A c                 j    U R                      U R                  U	 S S S 5        g ! , (       d  f       g = fr   r    r!   s     r   deleteAtomicDict.deletep   s    	
))C. 
s   $
2c                     U R                      U R                  R                  5       sS S S 5        $ ! , (       d  f       g = fr   )r   r   valuesr   s    r   r2   AtomicDict.valuest   s#    	YY 
s   1
?c                     U R                      U R                  R                  X5      U-   nX@R                  U'   UsSSS5        $ ! , (       d  f       g= f)a}  Atomically updates the stored value associated with the given key.

Performs the atomic equivalent of
dict[key] = dict.get(key, default_value) + inc.

Args:
  key: lookup key for the value of the first operand of the "+" operation.
  inc: Second operand of the "+" operation.
  default_value: Default value if there is no existing value for the key.

Returns:
  Incremented value.
Nr*   )r   r"   incr,   vals        r   	IncrementAtomicDict.Incrementx   s:     
IIMM#-3ciin 
s   .A
A)r   r   r   r   )__name__
__module____qualname____firstlineno____doc__r   r#   r'   r+   r/   r2   r8   __static_attributes__ r   r   r   r   N   s*    
/ r   r   c                   :    \ rS rSrSrS rS
S jrS rS rS r	Sr
g	)ProcessAndThreadSafeInt   a  This class implements a process and thread-safe integer.

It is backed either by a multiprocessing Value of type 'i' or an internal
threading lock.  This simplifies the calling pattern for
global variables that could be a Multiprocessing.Value or an integer.
Without this class, callers need to write code like this:

global variable_name
if isinstance(variable_name, int):
  return variable_name
else:
  return variable_name.value
c                     Xl         U R                   (       a  [        R                  SS5      U l        g [        R
                  " 5       U l        SU l        g )Nir   )multiprocessing_is_availablemultiprocessing_contextValuer&   r   r   r   )r   rG   s     r   r    ProcessAndThreadSafeInt.__init__   s:    (D%((*00a8dj.."didjr   c                     U R                   (       a  XR                  l        g U R                     Xl        S S S 5        g ! , (       d  f       g = fr   rG   r&   r   )r   reset_values     r   ResetProcessAndThreadSafeInt.Reset   s+    (($jj99 
 99s	   ?
Ac                     U R                   (       a   U R                  =R                  S-  sl        g U R                     U =R                  S-  sl        S S S 5        g ! , (       d  f       g = fN   rL   r3   s    r   r8   !ProcessAndThreadSafeInt.Increment   >    ((
jj!99

a
 99   A
A+c                     U R                   (       a   U R                  =R                  S-  sl        g U R                     U =R                  S-  sl        S S S 5        g ! , (       d  f       g = frQ   rL   r3   s    r   	Decrement!ProcessAndThreadSafeInt.Decrement   rT   rU   c                     U R                   (       a  U R                  R                  $ U R                     U R                  sS S S 5        $ ! , (       d  f       g = fr   rL   r3   s    r   GetValue ProcessAndThreadSafeInt.GetValue   s3    ((ZZ99zz 99s   A


A)r   rG   r&   Nr:   )r;   r<   r=   r>   r?   r   rN   r8   rW   rZ   r@   rA   r   r   rC   rC      s     !r   rC   c                 |    [         R                  " U 5      u  p#X2:  a   [         R                  " XU45        U$ X!:  a   [         R                  " XU45        U$ U$ ! [         R                  [        4 a     gf = f! [         R                  [        4 a     N^f = f! [         R                  [        4 a    Us $ f = f)a  Sets a new soft limit for the maximum number of open files.

The soft limit is used for this process (and its children), but the
hard limit is set by the system and cannot be exceeded.

We will first try to set the soft limit to the hard limit's value; if that
fails, we will try to set the soft limit to the fallback_value iff this would
increase the soft limit.

Args:
  resource_name: Name of the resource to increase the soft limit for.
  fallback_value: Fallback value to be used if we couldn't set the
                  soft value to the hard value (e.g., if the hard value
                  is "unlimited").

Returns:
  Current soft limit for the resource (after any changes we were able to
  make), or -1 if the resource doesn't exist.
)resource	getrlimiterror
ValueError	setrlimit)resource_namefallback_value
soft_limit
hard_limits       r   _IncreaseSoftLimitForResourcerg      s    ,'11-@Z Z(@A  (DE / ..*	%  NNJ' 
 NNJ'  s4   A A<  B A98A9<BBB;:B;c                     [         R                  (       a  g[         R                  (       a  g [        SS5       n U R	                  5       R                  5        HL  nSU;   d  M  UR                  S5      S   R                  S5      nS	UR                  5       ;   U4s  sS
S
S
5        $     S
S
S
5        g! , (       d  f       g
= f! [         aW  nUR                  [        R                  :X  a3  [        R                  " SUR                  [        U5      4-  5         S
nAge S
nAf[         a8  n[        R                  " SR!                  [        U5      5      5         S
nAgS
nAff = f)a  Determines if the OS doesn't support multiprocessing.

There are two cases we currently know about:
  - Multiple processes are not supported on Windows.
  - If an error is encountered while using multiple processes on Alpine Linux
    gsutil hangs. For this case it's possible we could do more work to find
    the root cause but after a fruitless initial attempt we decided instead
    to fall back on multi-threading w/o multiprocesing.

Returns:
  (bool indicator if multiprocessing should be prohibited, OS name)
)TWindows)FmacOSz/etc/os-releaserzNAME==rR   "zalpine linuxN)FUnknownzeUnable to open /etc/os-release to determine whether OS supports multiprocessing: errno=%d, message=%szYSomething went wrong while trying to determine multiprocessing capabilities.
Message: {0})r   
IS_WINDOWSIS_OSXopenread
splitlinessplitstriplowerIOErrorerrnoENOENTloggingdebugstr	Exceptionformat)flineos_nameeexcs        r   ShouldProhibitMultiprocessingr      s      		% &&(%%'$d?JJsOA&,,S1' GMMO3W=
= 
&	% (
   
&	%	% 
 ww%,,mm EWWc!f%& '  	 MM @@F#hA ! 	sY   C &B3$9B3	C 'B3*C 3
C=C C 
E%AD D  E%-.E  E%c                    [         bC  U (       a*  U R                  [        5        U R                  [        5        [        [         [        S9$ [        5       u  pU(       a'  SU-  nU (       a  U R                  U5        [        SSS9$ SnSnSn  [        R                  SS5        [        R                  5       q
S
n[        (       af   [        U[        [        R                  [         R"                  5      5      n [        U[        [        R&                  [         R"                  5      5      nU[         R"                  :  a  USU-  -  n[)        SU-  5      e Uq UqUq[        [         [        S9$ !   US	-  ne = f! [$         a     Nf = f! [$         a     Njf = f!   [*        R,                  " 5       nSnU b"  U R                  U5        U R                  U5         N= f)aQ  Checks if multiprocessing is available, and if so performs initialization.

There are some environments in which there is no way to use multiprocessing
logic that's built into Python (e.g., if /dev/shm is not available, then
we can't create semaphores). This simply tries out a few things that will be
needed to make sure the environment can support the pieces of the
multiprocessing module that we need.

See gslib.command.InitializeMultiprocessingVariables for
an explanation of why this is necessary.

Args:
  logger: (logging.Logger) Logger to use for debug output.

Returns:
  (MultiprocessingIsAvailableResult) A namedtuple with the following attrs:
    - multiprocessing_is_available: True iff the multiprocessing module is
          available for use.
    - stack_trace: The stack trace generated by the call we tried that
          failed.
N)r   r   z
Multiple processes are not supported on %s. Operations requesting
parallelism will be executed with multiple threads in a single process only.
FTz
You have requested multiple processes for an operation, but the
required functionality of Python's multiprocessing module is not available.
Operations requesting parallelism will be executed with multiple threads in a
single process only.
rF   r   zI
Please ensure that you have write access to both /dev/shm and /run/shm.
r]   a  
Your max number of open files, %s, is too low to allow safe multiprocessing.
On Linux you can fix this by adding something like "ulimit -n 10000" to your
~/.bashrc or equivalent file and opening a new terminal.

On macOS, you may also need to run a command like this once (in addition to the
above instructions), which might require a restart of your system to take
effect:
  launchctl limit maxfiles 10000

Alternatively, edit /etc/launchd.conf with something like:
  limit maxfiles 10000 10000

z)Max number of open files, %s, is too low.)$_cached_multiprocessing_is_availabler{   )_cached_multiprocessing_check_stack_tracewarn,_cached_multiprocessing_is_available_messager   r   rH   rI   Managertop_level_manager_HAS_RESOURCE_MODULEmaxrg   r^   RLIMIT_NOFILEr   MIN_ACCEPTABLE_OPEN_FILES_LIMITAttributeErrorRLIMIT_OFILEr}   	traceback
format_exc)loggershould_prohibit_multiprocessingr   messager   rG   limits          r   $CheckMultiprocessingAvailableAndInitr      s   4 *5ll<=kk>?+9=? ? .K-L*!$ G kk'+8<> > +!%'=##C+ 0779
 E)&&99;<)%%99;< y888   g AEIJJ 90 *F&.9+18.	)7;
= =}  g &    &&&(K#( ll;kk's`   E- #!F 2E8 82F **F -E55F 8
FF FF 
FF FF >Gc                      [        5       R                  (       a  [        R                  5       $ [        R                  " 5       $ )zReturns either a multiprocessing lock or a threading lock.

Use Multiprocessing lock iff we have access to the parts of the
multiprocessing module that are necessary to enable parallelism in operations.

Returns:
  Multiprocessing or threading lock.
)r   r   r   r   r   rA   r   r   
CreateLockr     s,     *+88!!##>>r   c                     SnU(       d   U R                  XS9  SnU(       d  M  gg! [        R                   a     N!f = f)a#  Puts an item to the status queue.

If the queue is full, this function will timeout periodically and repeat
until success. This avoids deadlock during shutdown by never making a fully
blocking call to the queue, since Python signal handlers cannot execute
in between instructions of the Python interpreter (see
https://docs.python.org/2/library/signal.html for details).

Args:
  queue: Queue class (typically the global status queue)
  msg: message to post to the queue.
  timeout: (optional) amount of time to wait before repeating put request.
F)timeoutTN)putQueueFull)r	   msgr   put_successs       r   PutToQueueWithTimeoutr     sC     +iii%k K :: 
s   ' >>r   )+r?   
__future__r   r   r   r   collectionsrx   rz   multiprocessingr   r   gslib.utilsr   r   	six.movesr	   r   r^   r   ImportErrorr   SEEK_AHEAD_JOIN_TIMEOUTSTATUS_QUEUE_OP_TIMEOUTUI_THREAD_JOIN_TIMEOUTZERO_TASKS_TO_DO_ARGUMENTr   r   r   
namedtupler   get_contextrH   r   ra   objectr   rC   rg   r   r   r   r   rA   r   r   <module>r      s   A & %  '       ! # $      <  (, $,0 )/3 , $/#9#9&(G$I  
,+77?
; ;|2f 2j/d*Zz=z$ /F S  H 	
# ,+,s*   B- 0C -B?3B::B?
CC