
    5                     ^   S r SSKJr  SSKJr  SSKJr  SSKrSSKrSSKrSSKrSSK	r	SSK
Jr  SSKrSSKJr  SSKJr  SS	KJr  SrS
r " S S\5      r " S S\5      r\R,                  " \R.                  5       " S S\5      5       r\R,                  " \R.                  5       " S S\5      5       r " S S\5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      r " S S\5      r  " S S \5      r! " S! S"\5      r" " S# S$\RF                  5      r$ " S% S&\5      r%S' r&g)(a.  Parallel execution pools based on multithreading.

This module provides 2 types of pools:
- NullPool: executes work synchronously, in the current process
- ThreadPool: executes work across multiple threads

It also contains a convenience method GetPool to get the appropriate pool for
the given number of threads.

The general usage is as follows:

>>> def identity(value): return value
>>> with parallel.GetPool(num_threads) as pool:
...   future = pool.ApplyAsync(identity, (42,))
...   assert future.Get() == 42
...   assert pool.Apply(f, (42,)) == 42
...   map_future = pool.MapAsync(identity, [1, 2, 3])
...   assert map_future.Get() == [1, 2, 3]
...   assert pool.Map(identity, [1, 2, 3]) == [1, 2, 3]

Errors are raised at the time of the Get() call on the future (which is implicit
for Apply() and Map()).
    )absolute_import)division)unicode_literalsN)
exceptions)map)queue)rangeg{Gz?c                       \ rS rSrSrSrg)UnsupportedPlatformExceptionA   zHException indicating that a pool was created on an unsupported platform. N)__name__
__module____qualname____firstlineno____doc____static_attributes__r       (lib/googlecloudsdk/core/util/parallel.pyr   r   A   s    Pr   r   c                   ,   ^  \ rS rSrSrU 4S jrSrU =r$ )InvalidStateExceptionE   zGException to indicate that a parallel pool was put in an invalid state.c                 ,   > [         [        U ]  U5        g N)superr   __init__)selfmsg	__class__s     r   r   InvalidStateException.__init__H   s    	
/4r   r   )r   r   r   r   r   r   r   __classcell__r   s   @r   r   r   E   s    O5 5r   r   c                       \ rS rSrSr\R                  S 5       r\R                  S 5       rS r	S r
S rS r\R                  S	 5       rS
 rS rSrg)BasePoolL   zBase class for parallel pools.

Provides a limited subset of the multiprocessing.Pool API.

Can be used as a context manager:

>>> with pool:
...  assert pool.Map(str, [1, 2, 3]) == ['1', '2', '3']
c                     [         e)zFInitialize non-trivial infrastructure (e.g. processes/threads/queues).NotImplementedErrorr   s    r   StartBasePool.StartX   
     r   c                     [         e)z%Clean up anything started in Start().r'   r)   s    r   JoinBasePool.Join]   r,   r   c                 @    U R                  X5      R                  5       $ )zFApplies func to each element in iterable and return a list of results.)MapAsyncGetr   funciterables      r   MapBasePool.Mapb   s    ==(,,..r   c           	      b    [        U Vs/ s H  o0R                  X45      PM     sn5      $ s  snf )z=Applies func to each element in iterable and return a future.)_MultiFuture
ApplyAsync)r   r4   r5   args       r   r1   BasePool.MapAsyncf   s(    (K(3v6(KLLKs   ,c                 @    U R                  X5      R                  5       $ )ah  Applies func to each element in iterable and return a generator.

The generator yields the result immediately after the task is done. So
result for faster task will be yielded earlier than for slower task.

Args:
  func: a function object
  iterable: an iterable object and each element is the arguments to func

Returns:
  A generator to produce the results.
)r1   GetResultsEagerFetchr3   s      r   MapEagerFetchBasePool.MapEagerFetchj   s     ==(==??r   c                 @    U R                  X5      R                  5       $ )z,Applies func to args and returns the result.)r:   r2   r   r4   argss      r   ApplyBasePool.Applyy   s    ??4&**,,r   c                     [         e)z'Apply func to args and return a future.r'   rB   s      r   r:   BasePool.ApplyAsync}   r,   r   c                 &    U R                  5         U $ r   )r*   r)   s    r   	__enter__BasePool.__enter__   s    JJLKr   c                 $    U R                  5         g r   )r.   )r   exc_typeexc_valexc_tbs       r   __exit__BasePool.__exit__   s    IIKr   r   N)r   r   r   r   r   abcabstractmethodr*   r.   r6   r1   r?   rD   r:   rI   rO   r   r   r   r   r$   r$   L   su        /M@-  r   r$   c                   f    \ rS rSrSrS r\R                  S 5       r\R                  S 5       r	Sr
g)
BaseFuture   zAA future object containing a value that may not be available yet.c                 >    U R                  5       R                  5       $ r   	GetResult
GetOrRaiser)   s    r   r2   BaseFuture.Get   s    >>&&((r   c                     [         er   r'   r)   s    r   rX   BaseFuture.GetResult       
r   c                     [         er   r'   r)   s    r   DoneBaseFuture.Done   r]   r   r   N)r   r   r   r   r   r2   rQ   rR   rX   r_   r   r   r   r   rT   rT      s>    I)    r   rT   c                   4    \ rS rSrSrS	S jrS rS rS rSr	g)
_Result   am  Value holder for a result (a value, if successful, or an error).

Only one of {value, error, exc_info} can be set.

Both error and exc_info exist due to issues with pickling. exc_info is better,
because we can re-raise it and preserve the original stacktrace, but it can't
be pickled. error gets re-raised from GetOrRaise().

Attributes:
  result: one-tuple of any object (optional), the result of the function. It's
    a one-tuple to distinguish a result of None from no result.
  error: Exception (optional), an exception that was thrown by the function
  exc_info: exc_info (optional) for the exception that occurred
Nc                     [        [        [        XU/5      5      S:  a  [        S5      eU(       d  U(       d  U(       d  [        S5      eXl        X l        X0l        g )N   z:_Result may only have one of [value, error, exc_info] set.z6_Result must have one of [value, error, exc_info] set.)sumr   bool
ValueErrorvalueerrorexc_info)r   ri   rj   rk   s       r   r   _Result.__init__   sO    
3teH-./!3   UhOPPJJMr   c                     U R                   (       a  U R                   S   $ U R                  (       a  U R                  e[        R                  " U R                  S   U R                  S   S9  g )Nr   re      )tb)ri   rj   r   reraiserk   r)   s    r   rY   _Result.GetOrRaise   sJ    zzZZ]	JJq)dmmA.>?r   c                    U R                   (       a  [        U R                   S   S9nOU n [        R                  " U5        U$ ! [        R                   a  n[        US9s SnA$ SnAf[
         aK  n[        [        R                  " SR                  U[        R                  " U5      5      5      S9s SnA$ SnAff = f)zReturn a pickleable version of this _Result.

Traceback objects can't be pickled, so we just pass through the exc_value.
Also, some values and exceptions can't be pickled.

Returns:
  _Result: a pickleable version of this result.
re   rj   Nz!Couldn't pickle result [{0}]: {1})	rk   rb   pickledumpsPicklingError	Exceptionformatsix	text_type)r   pickleable_resulterrs      r   ToPickleableResult_Result.ToPickleableResult   s     }}!a(897ll$%    3 76//
-
4
4s!356 7 77s0   A B>A& B>&B>3A B93B>9B>c                 d    SR                  U R                  U R                  U R                  5      $ )Nz+_Result(value={0}, error={1}, exc_info={2}))rx   ri   rj   rk   r)   s    r   __str___Result.__str__   s(    8??

DJJ/ /r   )rj   rk   ri   )NNN)
r   r   r   r   r   r   rY   r}   r   r   r   r   r   rb   rb      s    @0/r   rb   c                   (   ^  \ rS rSrU 4S jrSrU =r$ )
MultiError   c           	      t   > Xl         S n[        [        U ]  SSR	                  [        X!5      5      -   5        g )Nc                 t    SR                  [        U 5      R                  [        R                  " U 5      5      $ )Nz{}: {})rx   typer   ry   rz   )es    r   <lambda>%MultiError.__init__.<locals>.<lambda>   s"    8??47#3#3S]]15EFr   zOne or more errors occurred:
z

)errorsr   r   r   joinr   )r   r   fnr   s      r   r   MultiError.__init__   s4    K	FB	*d$(CO$	%&r   )r   )r   r   r   r   r   r   r!   r"   s   @r   r   r      s    & &r   r   c                   0    \ rS rSrSrS rS rS rS rSr	g)	r9      zFuture object that combines other Future objects.

Returns the results of each future when they are all ready.

Attributes:
  futures: list of BaseFuture.
c                     Xl         g r   futures)r   r   s     r   r   _MultiFuture.__init__   s    Lr   c                    / n/ nU R                    H#  n UR                  UR                  5       5        M%     U(       a  [	        [        U5      S9$ [	        U4S9$ ! [         a  nUR                  U5         S nAMk  S nAff = f)Nrs   )ri   )r   appendr2   rw   rb   r   )r   resultsr   futurer|   s        r   rX   _MultiFuture.GetResult   st    GF,,vzz|$ 
 :f-..'$$	  cs   A
B%A<<Bc                 r    [        U R                   Vs/ s H  oR                  5       PM     sn5      $ s  snf r   )allr   r_   )r   r   s     r   r_   _MultiFuture.Done   s'    T\\:\6\:;;:s   4c              #   B  #    U R                   nU(       al  / nU H>  nUR                  5       (       a   UR                  5       v   M-  UR	                  U5        M@     Un[
        R                  " [        5        U(       a  Mk  gg! [         a  nUv    SnAM}  SnAff = f7f)zCollect the results of futures.

Results are yielded immediately after the task is done. So
result for faster task will be yielded earlier than for slower task.

Yields:
  result which is done.
N)r   r_   r2   rw   r   timesleep_POLL_INTERVAL)r   uncollected_futurenext_uncollected_futurer   r|   s        r   r>   !_MultiFuture.GetResultsEagerFetch   s      
 "&&;;==**, "
(
(
0 ' 3
jj  
  IIs4   0BB:BB
BBBBBr   N)
r   r   r   r   r   r   rX   r_   r>   r   r   r   r   r9   r9      s    
%<!r   r9   c                   0    \ rS rSrSrS rS rS rS rSr	g)	_Taski  zAn individual work unit to be performed in parallel.

Attributes:
  func: callable, a function to be called with the given arguments. Must be
    serializable.
  args: tuple, the arguments to pass to func. Must be serializable.
c                     Xl         X l        g r   )r4   rC   rB   s      r   r   _Task.__init__  s    IIr   c                 X    [        U R                  R                  U R                  45      $ r   )hashr4   r   rC   r)   s    r   __hash___Task.__hash__#  s     ##TYY/00r   c                     U R                   R                  UR                   R                  :H  =(       a    U R                  UR                  :H  $ r   )r4   r   rC   r   others     r   __eq___Task.__eq__&  s3    99!4!44Pejj9PPr   c                 .    U R                  U5      (       + $ r   )r   r   s     r   __ne___Task.__ne__)  s    {{5!!!r   )rC   r4   N)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s    1Q"r   r   c                   &    \ rS rSrS rS rS rSrg)_NullFuturei2  c                     Xl         g r   result)r   r   s     r   r   _NullFuture.__init__4  s    Kr   c                     U R                   $ r   r   r)   s    r   rX   _NullFuture.GetResult7  s    ;;r   c                     g)NTr   r)   s    r   r_   _NullFuture.Done:  s    r   r   N)r   r   r   r   r   rX   r_   r   r   r   r   r   r   2  s    r   r   c                   0    \ rS rSrSrS rS rS rS rSr	g)	NullPooli>  z)Serial analog of parallel execution Pool.c                     SU l         g )NF_startedr)   s    r   r   NullPool.__init__A  s	    DMr   c                     U R                   (       d  [        S5      e [        U" U6 45      n[        U5      $ !   [        [        R                  " 5       S9n N+= f)N&NullPool must be Start()ed before use.rk   )r   r   rb   sysrk   r   )r   r4   rC   r   s       r   r:   NullPool.ApplyAsyncD  sP    == ""JKK0d~&f v0/fs	   8 Ac                 J    U R                   (       a  [        S5      eSU l         g )NzCan only start NullPool once.Tr   r   r)   s    r   r*   NullPool.StartP  s    }}!"ABBDMr   c                 <    U R                   (       d  [        S5      eg )Nr   r   r)   s    r   r.   NullPool.JoinU  s    == ""JKK	 r   r   N)
r   r   r   r   r   r   r:   r*   r.   r   r   r   r   r   r   >  s    1

Lr   r   c                   ,    \ rS rSrS rS rS rS rSrg)_ThreadFutureic  c                     Xl         X l        g r   _task_results_map)r   taskresults_maps      r   r   _ThreadFuture.__init__e  s    J#r   c                 >    U R                  5       R                  5       $ )z6Return the value of the future, or raise an exception.rW   r)   s    r   r2   _ThreadFuture.Geti  s    >>&&((r   c                      U R                   U R                  ;   a  U R                  U R                      $ [        R                  " [        5        MO  )zGet the _Result of the future.)r   r   r   r   r   r)   s    r   rX   _ThreadFuture.GetResultm  s=    
	t((	(  ,,
jj  r   c                 4    U R                   U R                  ;   $ )z8Return True if the task finished with or without errors.r   r)   s    r   r_   _ThreadFuture.Donet  s    ::****r   )r   r   N)	r   r   r   r   r   r2   rX   r_   r   r   r   r   r   r   c  s    $)!+r   r   c                       \ rS rSrS rSrg)_ThreadTaskiy  c                     Xl         g r   r   )r   r   s     r   r   _ThreadTask.__init__{  s    Ir   r   N)r   r   r   r   r   r   r   r   r   r   r   y  s    r   r   c                   .   ^  \ rS rSrU 4S jrS rSrU =r$ )_WorkerThreadi  c                 B   > [         [        U ]  5         Xl        X l        g r   )r   r   r   
work_queuer   )r   r   r   r   s      r   r   _WorkerThread.__init__  s    	-') O"r   c                 &    U R                   R                  5       nU[        L a  g UR                  n [	        UR
                  " UR                  6 45      nX0R                  UR                  '   Mn  !   [	        [        R                  " 5       S9n N:= f)Nr   )
r   get_STOP_WORKINGr   rb   r4   rC   r   rk   r   )r   thread_taskr   r   s       r   run_WorkerThread.run  s~    
OO'')k		%d2$))TYY/12 ,2{''( 2#,,.1s   #A0 0B)r   r   )r   r   r   r   r   r   r   r!   r"   s   @r   r   r     s    #

2 
2r   r   c                   0    \ rS rSrSrS rS rS rS rSr	g)	
ThreadPooli  z%Thread-based parallel execution Pool.c                 `    Xl         [        R                  " 5       U l        / U l        0 U l        g r   )num_threadsr   Queue_task_queueworker_threadsr   )r   r   s     r   r   ThreadPool.__init__  s&    "{{}DDDr   c                 
   U R                   (       a  [        S5      e[        U R                  5       HN  n[	        U R
                  U R                  5      nU R                   R                  U5        UR                  5         MP     g )Nz(ThreadPool must be started at most once.)	r   r   r	   r   r   r   r   r   startr   _threads      r   r*   ThreadPool.Start  sa    !"LMM4##$T--t/@/@Af
  (lln %r   c                     U R                   (       d  [        S5      e[        X5      n[        X0R                  5      nU R
                  R                  [        U5      5        U$ Nz(ThreadPool must be Start()ed before use.)r   r   r   r   r   r   putr   )r   r4   rC   r   r   s        r   r:   ThreadPool.ApplyAsync  sO    !"LMMD4!2!23FT*+Mr   c                     U R                   (       d  [        S5      eU R                    H"  nU R                  R                  [        5        M$     U R                    H  nUR                  5         M     g r   )r   r   r   r   r   r   r   s      r   r.   ThreadPool.Join  sV    !"LMM  
=) ! %%kkm &r   )r   r   r   r   N)
r   r   r   r   r   r   r*   r:   r.   r   r   r   r   r   r     s    -r   r   c                 8    U S:X  a
  [        5       $ [        U 5      $ )a-  Returns a parallel execution pool for the given number of threads.

Can return either:
- NullPool: if num_threads is 1.
- ThreadPool: if num_threads is greater than 1

Args:
  num_threads: int, the number of threads to use.

Returns:
  BasePool instance appropriate for the given type of parallelism.
re   )r   r   )r   s    r   GetPoolr    s     A:k""r   )'r   
__future__r   r   r   rQ   rt   r   	threadingr   googlecloudsdk.corer   ry   	six.movesr   r   r	   r   r   rw   r   r   add_metaclassABCMetaobjectr$   rT   rb   r   r9   r   r   r   r   r   Threadr   r   r  r   r   r   <module>r     s7  0 '  ' 
  
   * 
    Q9 Q5I 5 3;;:v :  :z 3;;   </f </~& &0!: 0!f"F "8	* 	Lx LJ+J +,& 2I$$ 2(   P#r   