
    @                         S SK r S SKJr  S SKJrJr  S SKJrJr  S SK	J
r
  S SKJrJr  S SKJr  S SKJr  S S	KJr  S S
KJr  S SKJr  S SKJrJrJrJrJrJrJr  S SK J!r!   " S S\\\\4   5      r"g)    N)deepcopy)OptionalList)GoogleAPICallErrorFailedPrecondition)wait_ignore_errors)
ConnectionConnectionFactory)ConnectionReinitializer)FlowControlBatcher)is_reset_signal)RetryingConnection)
Subscriber)SubscribeRequestSubscribeResponseFlowControlRequestSequencedMessageInitialSubscribeRequestSeekRequestCursor)SubscriberResetHandlerc                      \ rS rSr% \\S'   \\S'   \\\	4   \S'   \
\S'   \\S'   \\   \S'   S\S	'   \\R                     \S
'   \\R                     \S'   S\S\S\\\	4   S\
4S jrS rS rS rS\	4S jrS rS rS rS rS\4S jrS\\\	4   4S jrS\\R@                  RB                     4S jr"S \#4S! jr$S"r%g#)$SubscriberImpl1   _base_initial_token_flush_seconds_connection_reset_handler_outstanding_flow_control_last_received_offsetz-asyncio.Queue[List[SequencedMessage.meta.pb]]_message_queue	_receiver_flusherbase_initialtoken_flush_secondsfactoryreset_handlerc                     Xl         X l        [        X05      U l        X@l        [        5       U l        SU l        S U l        [        R                  " 5       U l        S U l        S U l        g )NF)r   r   r   r   r   r   r   _reinitializingr    asyncioQueuer!   r"   r#   )selfr$   r%   r&   r'   s        Hlib/third_party/google/cloud/pubsublite/internal/wire/subscriber_impl.py__init__SubscriberImpl.__init__B   sZ     *$7!-g<+);)=&$%)"%mmo    c                 V   #    U R                   R                  5       I S h  vN   U $  N7fN)r   
__aenter__r,   s    r-   r3   SubscriberImpl.__aenter__T   s'     ))+++ 	,s   )')c                     U R                   b   eU R                  b   e[        R                  " U R	                  5       5      U l         [        R                  " U R                  5       5      U l        g r2   )r"   r#   r*   ensure_future_receive_loop_flush_loopr4   s    r-   _start_loopersSubscriberImpl._start_loopersX   sX    ~~%%%}}$$$ ..t/A/A/CD--d.>.>.@Ar0   c                 R  #    U R                   (       a>  U R                   R                  5         [        U R                   5      I S h  vN   S U l         U R                  (       a?  U R                  R                  5         [        U R                  5      I S h  vN   S U l        g g  N\ N7fr2   )r"   cancelr   r#   r4   s    r-   _stop_loopersSubscriberImpl._stop_loopers^   st     >>NN!!#$T^^444!DN==MM  "$T]]333 DM  5 4s%   AB'B#AB'B%B'%B'responsec           	      j   SU;  a%  U R                   R                  [        S5      5        g [        UR                  R                  R
                  5      nU R                  R                  U5        U H  nU R                  bx  UR                  R                  U R                  ::  aT  U R                   R                  [        SR                  UR                  R                  U R                  5      5      5          g UR                  R                  U l        M     U R                  R                  U5        g )Nmessagesz@Received an invalid subsequent response on the subscribe stream.zfReceived an invalid out of order message from the server. Message is {}, previous last received is {}.)r   failr   listrB   _pbr   on_messagesr    cursoroffsetformatr!   
put_nowait)r,   r@   rB   messages       r-   _handle_responseSubscriberImpl._handle_responseh   s   X%!!"V
 &&**
 	&&228<G**6NN))T-G-GG  %%& A  H  H#NN1143M3M )0)>)>D&   	&&x0r0   c                 z   #     U R                   R                  5       I S h  vN nU R                  U5        M5   N7fr2   )r   readrL   )r,   r@   s     r-   r8   SubscriberImpl._receive_loop   s5     !--2244H!!(+ 4s   ;9;c                    #    U R                   R                  5       nUc  g  U R                  R                  [	        US95      I S h  vN   g  N! [
         a     g f = f7f)Nflow_control)r   release_pending_requestr   writer   r   )r,   reqs     r-   _try_send_tokensSubscriberImpl._try_send_tokens   sW     ,,DDF;	""(()9s)KLLL! 		s:   A%A AA AA 
AAAAc                    #     [         R                  " U R                  5      I S h  vN   U R                  5       I S h  vN   MB   N N7fr2   )r*   sleepr   rW   r4   s    r-   r9   SubscriberImpl._flush_loop   s=     -- 9 9:::''))) :)s    %A
AA
A A
A
c                    #    U R                  5       I S h  vN   U R                  R                  XU5      I S h  vN   g  N) N7fr2   )r>   r   	__aexit__)r,   exc_typeexc_valexc_tbs       r-   r]   SubscriberImpl.__aexit__   s;       """((FCCC 	#Cs   AA #AAAAerrorc                   #    U R                  5       I S h  vN   [        U5      (       a  U R                  R                  5       (       dy  U R                  R	                  5       n[        S U 5       5      nU R                  R                  [        [        U5      US95        U R                  R                  5       (       d  My  U R                  R                  5       I S h  vN   S U l        g g  N N7f)Nc              3   8   #    U  H  oR                   v   M     g 7fr2   )
size_bytes).0rK   s     r-   	<genexpr>1SubscriberImpl.stop_processing.<locals>.<genexpr>   s     #Le7$6$6es   )allowed_messagesallowed_bytes)r>   r   r!   empty
get_nowaitsumr   addr   lenr   handle_resetr    )r,   rb   batchrj   s       r-   stop_processingSubscriberImpl.stop_processing   s       """5!!))//118<8K8K8V8V8X ##Le#L L..22&),U&3 ))//11 %%22444)-D& " 	# 5s(   C2C.B*C2C2 C0!C20C2
connectionc                 N  #    [        U R                  5      nU R                  b#  [        [	        U R                  S-   S9S9Ul        O&[        [        R                  R                  S9Ul        UR                  [        US95      I S h  vN   UR                  5       I S h  vN nSU;  a%  U R                  R                  [        S5      5        g U R                  R                  5       nUb   UR                  [        US95      I S h  vN   U R!                  5         g  N N N7f)	N   )rH   )rG   )named_target)initialrx   z=Received an invalid initial response on the subscribe stream.rR   )r   r   r    r   r   initial_locationNamedTargetCOMMITTED_CURSORrU   r   rO   r   rC   r   r   request_for_restartr:   )r,   rt   rx   r@   tokenss        r-   reinitializeSubscriberImpl.reinitialize   s     4--.%%1'2T%?%?!%CD(G$ (3(44EE(G$ /@AAA#**H$!!"S
 //CCE""#3#HIII 	B* Js7   BD%	D
D%!D!"A'D%	D#
D%!D%#D%returnc                    #    U R                   R                  U R                  R                  5       5      I S h  vN $  N7fr2   )r   await_unless_failedr!   getr4   s    r-   rO   SubscriberImpl.read   s1     %%99$:M:M:Q:Q:STTTTs   7A >A requestc                 :    U R                   R                  U5        g r2   )r   rn   )r,   r   s     r-   
allow_flowSubscriberImpl.allow_flow   s    &&**73r0   )
r   r   r#   r    r!   r   r"   r)   r   r   N)&__name__
__module____qualname____firstlineno__r   __annotations__floatr   r   r   r   r   r   intr*   Futurer
   r.   r3   r:   r>   rL   r8   rW   r9   r]   r   rr   r	   r~   r   r   metapbrO   r   r   __static_attributes__ r0   r-   r   r   1   s    +*#$46G$GHH**11#C=(CC''w~~&&- # ##35F#FG	
 .$B!1): 1<,
*
D.+= ."$%57H%HI4UD!1!6!6!9!9: U4"4 4r0   r   )#r*   copyr   typingr   r   google.api_core.exceptionsr   r   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   0google.cloud.pubsublite.internal.wire.connectionr	   r
   >google.cloud.pubsublite.internal.wire.connection_reinitializerr   :google.cloud.pubsublite.internal.wire.flow_control_batcherr   2google.cloud.pubsublite.internal.wire.reset_signalr   9google.cloud.pubsublite.internal.wire.retrying_connectionr   0google.cloud.pubsublite.internal.wire.subscriberr   google.cloud.pubsublite_v1r   r   r   r   r   r   r   >google.cloud.pubsublite.internal.wire.subscriber_reset_handlerr   r   r   r0   r-   <module>r      sc      ! M U O X G  
\4'(8:K(KL\4r0   