
    [                         S r SSKrSSKrSSKrSSKrSSKJrJrJr  SSK	r	\R                  (       a  SSKJr   " S S\R                  S9rS\R                  R                   4S	 jr " S
 S\5      rg)zSchedulers provide means to *schedule* callbacks asynchronously.

These are used by the subscriber to call the user-provided callback to process
each message.
    N)CallableListOptional)	pubsub_v1c                       \ rS rSrSr\\R                  S\R                  4S j5       5       r\R                  S\
SS4S j5       r\R                   SS\S\S	   4S
 jj5       rSrg)	Scheduler    z_Abstract base class for schedulers.

Schedulers are used to schedule callbacks asynchronously.
returnc                     [         e)zQueue: A concurrency-safe queue specific to the underlying
concurrency implementation.

This queue is used to send messages *back* to the scheduling actor.
NotImplementedErrorselfs    >lib/third_party/google/cloud/pubsub_v1/subscriber/scheduler.pyqueueScheduler.queue&   s
     "!    callbackNc                     [         e)zSchedule the callback to be called asynchronously.

Args:
    callback: The function to call.
    args: Positional arguments passed to the callback.
    kwargs: Key-word arguments passed to the callback.

Returns:
    None
r   r   r   argskwargss       r   scheduleScheduler.schedule0   s
     "!r   await_msg_callbacks$pubsub_v1.subscriber.message.Messagec                     [         e)a+  Shuts down the scheduler and immediately end all pending callbacks.

Args:
    await_msg_callbacks:
        If ``True``, the method will block until all currently executing
        callbacks are done processing. If ``False`` (default), the
        method will not wait for the currently running callbacks to complete.

Returns:
    The messages submitted to the scheduler that were not yet dispatched
    to their callbacks.
    It is assumed that each message was submitted to the scheduler as the
    first positional argument to the provided callback.
r   )r   r   s     r   shutdownScheduler.shutdown>   s
    $ "!r    F)__name__
__module____qualname____firstlineno____doc__propertyabcabstractmethodr   Queuer   r   boolr   r   __static_attributes__r    r   r   r   r       s    
 "u{{ "  " 	" "t " " 	*/"#'"	4	5" "r   r   )	metaclassr
   c                  >    [         R                  R                  SSS9$ )N
   z"ThreadPoolExecutor-ThreadScheduler)max_workersthread_name_prefix)
concurrentfuturesThreadPoolExecutorr    r   r   "_make_default_thread_pool_executorr5   S   s&    00+O 1  r   c                       \ rS rSrSr SS\\R                  R                     4S jjr	\
S 5       rS\SS4S	 jr SS
\S\S   4S jjrSrg)ThreadSchedulerY   a  A thread pool-based scheduler. It must not be shared across
   SubscriberClients.

This scheduler is useful in typical I/O-bound message processing.

Args:
    executor:
        An optional executor to use. If not specified, a default one
        will be created.
Nexecutorc                 j    [         R                  " 5       U l        Uc  [        5       U l        g Xl        g N)r   r*   _queuer5   	_executor)r   r9   s     r   __init__ThreadScheduler.__init__e   s&     $);;=?ADN%Nr   c                     U R                   $ )z^Queue: A thread-safe queue used for communication between callbacks
and the scheduling thread.)r<   r   s    r   r   ThreadScheduler.queuen   s     {{r   r   r
   c                      U R                   R                  " U/UQ70 UD6  g! [         a    [        R                  " S[
        SS9   gf = f)zSchedule the callback to be called asynchronously in a thread pool.

Args:
    callback: The function to call.
    args: Positional arguments passed to the callback.
    kwargs: Key-word arguments passed to the callback.

Returns:
    None
z.Scheduling a callback after executor shutdown.   )category
stacklevelN)r=   submitRuntimeErrorwarningswarnRuntimeWarningr   s       r   r   ThreadScheduler.schedulet   sG    	NN!!(<T<V< 	MM@'	s    # $A
	A
r   r   c                    / n  U R                   R                  R                  SS9nUc  M)  UR                  UR                  S   5        MH  ! [
        R                   a     Of = fU R                   R                  US9  U$ )a/  Shut down the scheduler and immediately end all pending callbacks.

Args:
    await_msg_callbacks:
        If ``True``, the method will block until all currently executing
        executor threads are done processing. If ``False`` (default), the
        method will not wait for the currently running threads to complete.

Returns:
    The messages submitted to the scheduler that were not yet dispatched
    to their callbacks.
    It is assumed that each message was submitted to the scheduler as the
    first positional argument to the provided callback.
F)blockr   )wait)r=   _work_queuegetappendr   r   Emptyr   )r   r   dropped_messages	work_items       r   r   ThreadScheduler.shutdown   s    " 	 NN66:::G	$ ''	q(9:	 
 {{ 		 	%89s   A	A A$#A$)r=   r<   r;   r!   )r"   r#   r$   r%   r&   r   r2   r3   r4   r>   r'   r   r   r   r+   r   r   r,   r    r   r   r7   r7   Y   sx    	 KO& !3!3!F!FG&  
 t * +0" #'" 	4	5"  " r   r7   )r&   r(   concurrent.futuresr2   r   typingr   r   r   rH   TYPE_CHECKINGgoogle.cloudr   ABCMetar   r3   r4   r5   r7   r    r   r   <module>r[      sd        + + 	&0"#++ 0"fJ,>,>,Q,Q Q i Q r   