o
    5                     @   s  d Z ddlmZ ddlmZ ddlmZ ddlZddlZddlZddlZddl	Z	ddl
mZ ddlZddlmZ ddlmZ dd	lmZ dZd
ZG dd deZG dd deZeejG dd deZeejG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZ G dd  d eZ!G d!d" d"eZ"G d#d$ d$ej#Z$G d%d& d&eZ%d'd( Z&dS ))a.  Parallel execution pools based on multithreading.

This module provides 2 types of pools:
- NullPool: executes work synchronously, in the current process
- ThreadPool: executes work across multiple threads

It also contains a convenience method GetPool to get the appropriate pool for
the given number of threads.

The general usage is as follows:

>>> def identity(value): return value
>>> with parallel.GetPool(num_threads) as pool:
...   future = pool.ApplyAsync(identity, (42,))
...   assert future.Get() == 42
...   assert pool.Apply(f, (42,)) == 42
...   map_future = pool.MapAsync(identity, [1, 2, 3])
...   assert map_future.Get() == [1, 2, 3]
...   assert pool.Map(identity, [1, 2, 3]) == [1, 2, 3]

Errors are raised at the time of the Get() call on the future (which is implicit
for Apply() and Map()).
    )absolute_import)division)unicode_literalsN)
exceptions)map)queue)rangeg{Gz?c                   @   s   e Zd ZdZdS )UnsupportedPlatformExceptionzHException indicating that a pool was created on an unsupported platform.N)__name__
__module____qualname____doc__ r   r   >/tmp/google-cloud-sdk/lib/googlecloudsdk/core/util/parallel.pyr	   A   s    r	   c                       s    e Zd ZdZ fddZ  ZS )InvalidStateExceptionzGException to indicate that a parallel pool was put in an invalid state.c                    s   t t| | d S N)superr   __init__)selfmsg	__class__r   r   r   H      zInvalidStateException.__init__)r
   r   r   r   r   __classcell__r   r   r   r   r   E   s    r   c                   @   sj   e Zd ZdZejdd Zejdd Zdd Zdd	 Z	d
d Z
dd Zejdd Zdd Zdd ZdS )BasePoolzBase class for parallel pools.

  Provides a limited subset of the multiprocessing.Pool API.

  Can be used as a context manager:

  >>> with pool:
  ...  assert pool.Map(str, [1, 2, 3]) == ['1', '2', '3']
  c                 C      t )zFInitialize non-trivial infrastructure (e.g. processes/threads/queues).NotImplementedErrorr   r   r   r   StartX      zBasePool.Startc                 C   r   )z%Clean up anything started in Start().r   r   r   r   r   Join]   r    zBasePool.Joinc                 C      |  || S )zFApplies func to each element in iterable and return a list of results.)MapAsyncGetr   funciterabler   r   r   Mapb      zBasePool.Mapc                    s   t  fdd|D S )z=Applies func to each element in iterable and return a future.c                    s   g | ]	}  |fqS r   )
ApplyAsync).0argr&   r   r   r   
<listcomp>h   s    z%BasePool.MapAsync.<locals>.<listcomp>)_MultiFuturer%   r   r-   r   r#   f   s   zBasePool.MapAsyncc                 C   r"   )a  Applies func to each element in iterable and return a generator.

    The generator yields the result immediately after the task is done. So
    result for faster task will be yielded earlier than for slower task.

    Args:
      func: a function object
      iterable: an iterable object and each element is the arguments to func

    Returns:
      A generator to produce the results.
    )r#   GetResultsEagerFetchr%   r   r   r   MapEagerFetchj   s   zBasePool.MapEagerFetchc                 C   r"   )z,Applies func to args and returns the result.)r*   r$   r   r&   argsr   r   r   Applyy   r)   zBasePool.Applyc                 C   r   )z'Apply func to args and return a future.r   r2   r   r   r   r*   }   r    zBasePool.ApplyAsyncc                 C   s   |    | S r   )r   r   r   r   r   	__enter__   s   zBasePool.__enter__c                 C   s   |    d S r   )r!   )r   exc_typeexc_valexc_tbr   r   r   __exit__      zBasePool.__exit__N)r
   r   r   r   abcabstractmethodr   r!   r(   r#   r1   r4   r*   r5   r9   r   r   r   r   r   L   s    



r   c                   @   s4   e Zd ZdZdd Zejdd Zejdd ZdS )	
BaseFuturezAA future object containing a value that may not be available yet.c                 C      |    S r   	GetResult
GetOrRaiser   r   r   r   r$      r:   zBaseFuture.Getc                 C   r   r   r   r   r   r   r   r@         zBaseFuture.GetResultc                 C   r   r   r   r   r   r   r   Done   rB   zBaseFuture.DoneN)	r
   r   r   r   r$   r;   r<   r@   rC   r   r   r   r   r=      s    
r=   c                   @   s2   e Zd ZdZdddZdd Zdd Zd	d
 ZdS )_Resulta  Value holder for a result (a value, if successful, or an error).

  Only one of {value, error, exc_info} can be set.

  Both error and exc_info exist due to issues with pickling. exc_info is better,
  because we can re-raise it and preserve the original stacktrace, but it can't
  be pickled. error gets re-raised from GetOrRaise().

  Attributes:
    result: one-tuple of any object (optional), the result of the function. It's
      a one-tuple to distinguish a result of None from no result.
    error: Exception (optional), an exception that was thrown by the function
    exc_info: exc_info (optional) for the exception that occurred
  Nc                 C   sJ   t tt|||gdkrtd|s|s|std|| _|| _|| _d S )N   z:_Result may only have one of [value, error, exc_info] set.z6_Result must have one of [value, error, exc_info] set.)sumr   bool
ValueErrorvalueerrorexc_info)r   rI   rJ   rK   r   r   r   r      s   
z_Result.__init__c                 C   s:   | j r| j d S | jr| jtj| jd | jd d d S )Nr   rE      )tb)rI   rJ   r   reraiserK   r   r   r   r   rA      s
   
z_Result.GetOrRaisec                 C   s   | j rt| j d d}n| }zt| W |S  tjy. } zt|dW  Y d}~S d}~w tyN } zttd|t|dW  Y d}~S d}~ww )zReturn a pickleable version of this _Result.

    Traceback objects can't be pickled, so we just pass through the exc_value.
    Also, some values and exceptions can't be pickled.

    Returns:
      _Result: a pickleable version of this result.
    rE   rJ   Nz!Couldn't pickle result [{0}]: {1})	rK   rD   pickledumpsPicklingError	Exceptionformatsix	text_type)r   pickleable_resulterrr   r   r   ToPickleableResult   s"   	
z_Result.ToPickleableResultc                 C   s   d | j| j| jS )Nz+_Result(value={0}, error={1}, exc_info={2}))rT   rI   rJ   rK   r   r   r   r   __str__   s   z_Result.__str__)NNN)r
   r   r   r   r   rA   rY   rZ   r   r   r   r   rD      s    

rD   c                       s   e Zd Z fddZ  ZS )
MultiErrorc                    s2   || _ dd }tt| ddt||  d S )Nc                 S   s   d t| jt| S )Nz{}: {})rT   typer
   rU   rV   )er   r   r   <lambda>   s    z%MultiError.__init__.<locals>.<lambda>zOne or more errors occurred:
z

)errorsr   r[   r   joinr   )r   r_   fnr   r   r   r      s   
zMultiError.__init__)r
   r   r   r   r   r   r   r   r   r[      s    r[   c                   @   0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )r/   zFuture object that combines other Future objects.

  Returns the results of each future when they are all ready.

  Attributes:
    futures: list of BaseFuture.
  c                 C   
   || _ d S r   )futures)r   rd   r   r   r   r         
z_MultiFuture.__init__c                 C   sr   g }g }| j D ]"}z	||  W q ty) } z|| W Y d }~qd }~ww |r3tt|dS t|fdS )NrO   )rI   )rd   appendr$   rS   rD   r[   )r   resultsr_   futurerX   r   r   r   r@      s   
z_MultiFuture.GetResultc                 C   s   t dd | jD S )Nc                 S   s   g | ]}|  qS r   )rC   )r+   rh   r   r   r   r.      s    z%_MultiFuture.Done.<locals>.<listcomp>)allrd   r   r   r   r   rC      r   z_MultiFuture.Donec                 c   s    | j }|r>g }|D ](}| r-z| V  W q
 ty, } z	|V  W Y d}~q
d}~ww || q
|}tt |sdS dS )zCollect the results of futures.

    Results are yielded immediately after the task is done. So
    result for faster task will be yielded earlier than for slower task.

    Yields:
      result which is done.
    N)rd   rC   r$   rS   rf   timesleep_POLL_INTERVAL)r   uncollected_futurenext_uncollected_futurerh   rX   r   r   r   r0      s    	
z!_MultiFuture.GetResultsEagerFetchN)r
   r   r   r   r   r@   rC   r0   r   r   r   r   r/      s    r/   c                   @   rb   )_TaskzAn individual work unit to be performed in parallel.

  Attributes:
    func: callable, a function to be called with the given arguments. Must be
      serializable.
    args: tuple, the arguments to pass to func. Must be serializable.
  c                 C      || _ || _d S r   )r&   r3   r2   r   r   r   r        
z_Task.__init__c                 C   s   t | jj| jfS r   )hashr&   r
   r3   r   r   r   r   __hash__#  s   z_Task.__hash__c                 C   s   | j j|j jko| j|jkS r   )r&   r
   r3   r   otherr   r   r   __eq__&  s   z_Task.__eq__c                 C   s   |  | S r   )rv   rt   r   r   r   __ne__)  r:   z_Task.__ne__N)r
   r   r   r   r   rs   rv   rw   r   r   r   r   ro     s    ro   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )_NullFuturec                 C   rc   r   result)r   rz   r   r   r   r   4  re   z_NullFuture.__init__c                 C   s   | j S r   ry   r   r   r   r   r@   7  s   z_NullFuture.GetResultc                 C   s   dS )NTr   r   r   r   r   rC   :  s   z_NullFuture.DoneN)r
   r   r   r   r@   rC   r   r   r   r   rx   2  s    rx   c                   @   rb   )NullPoolz)Serial analog of parallel execution Pool.c                 C   s
   d| _ d S )NF)_startedr   r   r   r   r   A  re   zNullPool.__init__c                 C   sF   | j stdzt|| f}W t|S    tt d}Y t|S )N&NullPool must be Start()ed before use.rK   )r|   r   rD   sysrK   rx   )r   r&   r3   rz   r   r   r   r*   D  s   zNullPool.ApplyAsyncc                 C   s   | j rtdd| _ d S )NzCan only start NullPool once.Tr|   r   r   r   r   r   r   P  s   
zNullPool.Startc                 C   s   | j stdd S )Nr}   r   r   r   r   r   r!   U  s   zNullPool.JoinN)r
   r   r   r   r   r*   r   r!   r   r   r   r   r{   >  s    r{   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_ThreadFuturec                 C   rp   r   _task_results_map)r   taskresults_mapr   r   r   r   e  rq   z_ThreadFuture.__init__c                 C   r>   )z6Return the value of the future, or raise an exception.r?   r   r   r   r   r$   i     z_ThreadFuture.Getc                 C   s&   	 | j | jv r| j| j  S tt q)zGet the _Result of the future.)r   r   rj   rk   rl   r   r   r   r   r@   m  s
   
z_ThreadFuture.GetResultc                 C   s   | j | jv S )z8Return True if the task finished with or without errors.r   r   r   r   r   rC   t  r   z_ThreadFuture.DoneN)r
   r   r   r   r$   r@   rC   r   r   r   r   r   c  s
    r   c                   @   s   e Zd Zdd ZdS )_ThreadTaskc                 C   rc   r   )r   )r   r   r   r   r   r   {  re   z_ThreadTask.__init__N)r
   r   r   r   r   r   r   r   r   y  s    r   c                       s$   e Zd Z fddZdd Z  ZS )_WorkerThreadc                    s   t t|   || _|| _d S r   )r   r   r   
work_queuer   )r   r   r   r   r   r   r     s   
z_WorkerThread.__init__c                 C   sZ   	 | j  }|tu rd S |j}zt|j|j f}W n   tt d}Y || j	|j< q)NTr~   )
r   get_STOP_WORKINGr   rD   r&   r3   r   rK   r   )r   thread_taskr   rz   r   r   r   run  s   
z_WorkerThread.run)r
   r   r   r   r   r   r   r   r   r   r     s    r   c                   @   rb   )
ThreadPoolz%Thread-based parallel execution Pool.c                 C   s    || _ t | _g | _i | _d S r   )num_threadsr   Queue_task_queueworker_threadsr   )r   r   r   r   r   r     s   

zThreadPool.__init__c                 C   sD   | j rtdt| jD ]}t| j| j}| j | |  qd S )Nz(ThreadPool must be started at most once.)	r   r   r   r   r   r   r   rf   startr   _threadr   r   r   r     s   
zThreadPool.Startc                 C   s8   | j stdt||}t|| j}| jt| |S Nz(ThreadPool must be Start()ed before use.)r   r   ro   r   r   r   putr   )r   r&   r3   r   rz   r   r   r   r*     s   
zThreadPool.ApplyAsyncc                 C   s>   | j std| j D ]}| jt q
| j D ]}|  qd S r   )r   r   r   r   r   r`   r   r   r   r   r!     s   


zThreadPool.JoinN)r
   r   r   r   r   r   r*   r!   r   r   r   r   r     s    r   c                 C   s   | dkrt  S t| S )a=  Returns a parallel execution pool for the given number of threads.

  Can return either:
  - NullPool: if num_threads is 1.
  - ThreadPool: if num_threads is greater than 1

  Args:
    num_threads: int, the number of threads to use.

  Returns:
    BasePool instance appropriate for the given type of parallelism.
  rE   )r{   r   )r   r   r   r   GetPool  s   r   )'r   
__future__r   r   r   r;   rP   r   	threadingrj   googlecloudsdk.corer   rU   	six.movesr   r   r   r   rl   rS   r	   r   add_metaclassABCMetaobjectr   r=   rD   r[   r/   ro   rx   r{   r   r   Threadr   r   r   r   r   r   r   <module>   sB   

=?
3%(