
                             S SK r S SKJrJr  S SKrS SKJr  S SKJr  S SK	J
r
Jr  S SKJrJr  S SKJr  S SKJr  S S	KJr  S S
KJrJrJr  \R2                  " \5      r " S S\\\\4   5      rg)    N)OptionalList)wait_ignore_errors)	Committer)RetryingConnectionConnectionFactory)FailedPreconditionGoogleAPICallError)ConnectionReinitializer)
Connection)Cursor)StreamingCommitCursorRequestStreamingCommitCursorResponseInitialCommitCursorRequestc                   Z   \ rS rSr% \\S'   \\S'   \\\	4   \S'   \
\   \S'   \\   \S'   \
\R                     \S'   \
\R                     \S'   \R                  \S	'   S
\S\S\\\	4   4S jrS rS rS rS\	4S jrS rS rS rS rS rS\SS4S jrS\4S jrS\\\	4   4S jrS r g)!CommitterImpl*   _initial_flush_seconds_connection_next_to_commit_outstanding_commits	_receiver_flusher_emptyinitialflush_secondsfactoryc                     Xl         X l        [        X05      U l        S U l        / U l        S U l        S U l        [        R                  " 5       U l
        U R                  R                  5         g N)r   r   r   r   r   r   r   r   asyncioEventr   set)selfr   r   r   s       Glib/third_party/google/cloud/pubsublite/internal/wire/committer_impl.py__init__CommitterImpl.__init__=   sU      +-g<#$&!mmo    c                 V   #    U R                   R                  5       I S h  vN   U $  N7fr    )r   
__aenter__r$   s    r%   r*   CommitterImpl.__aenter__O   s'     ))+++ 	,s   )')c                     U R                   b   eU R                  b   e[        R                  " U R	                  5       5      U l         [        R                  " U R                  5       5      U l        g r    )r   r   r!   ensure_future_receive_loop_flush_loopr+   s    r%   _start_loopersCommitterImpl._start_loopersS   sX    ~~%%%}}$$$ ..t/A/A/CD--d.>.>.@Ar(   c                 R  #    U R                   (       a>  U R                   R                  5         [        U R                   5      I S h  vN   S U l         U R                  (       a?  U R                  R                  5         [        U R                  5      I S h  vN   S U l        g g  N\ N7fr    )r   cancelr   r   r+   s    r%   _stop_loopersCommitterImpl._stop_loopersY   st     >>NN!!#$T^^444!DN==MM  "$T]]333 DM  5 4s%   AB'B#AB'B%B'%B'responsec                    SU;  a$  U R                   R                  [        S5      5        UR                  R                  [        U R                  5      :  a$  U R                   R                  [        S5      5        [        UR                  R                  5       H  nU R                  R                  S5        M      [        U R                  5      S:X  a  U R                  R                  5         g g )Ncommitz=Received an invalid subsequent response on the commit stream.zEReceived a commit response on the stream with no outstanding commits.r   )r   failr	   r9   acknowledged_commitslenr   rangepopr   r#   )r$   r7   _s      r%   _handle_responseCommitterImpl._handle_responsec   s    8#!!"S
 ??//#d6O6O2PP!!"[
 x;;<A%%))!, =t(()Q.KKOO /r(   c                 z   #     U R                   R                  5       I S h  vN nU R                  U5        M5   N7fr    )r   readr@   )r$   r7   s     r%   r/   CommitterImpl._receive_loopu   s5     !--2244H!!(+ 4s   ;9;c                    #     [         R                  " U R                  5      I S h  vN   U R                  5       I S h  vN   MB   N N7fr    )r!   sleepr   _flushr+   s    r%   r0   CommitterImpl._flush_loopz   s:     -- 3 3444++- 4s    %A
AA
A A
A
c                    #    U R                  5       I S h  vN   U R                  R                  5       (       d  U R                  5       I S h  vN   U R                  R	                  XU5      I S h  vN   g  N` N+ N	7fr    )r5   r   errorrG   	__aexit__)r$   exc_typeexc_valexc_tbs       r%   rK   CommitterImpl.__aexit__   sb       """%%''++-((FCCC 	#Cs3   A=A76A=A9#A=1A;2A=9A=;A=c                   #    U R                   c  g [        5       nU R                   UR                  l        U R                  R                  U R                   5        S U l         U R                  R                  5          U R                  R                  U5      I S h  vN   g  N! [         a"  n[        R                  SU 35         S nAg S nAff = f7f)NzFailed commit on stream: )r   r   r9   cursorr   appendr   clearr   writer
   _LOGGERdebug)r$   reqes      r%   rG   CommitterImpl._flush   s     '*, 00

!!(()=)=>#	;""((---! 	;MM5aS9::	;sB   A:C=B# B!B#  C!B# #
C-C
C
CCc                    #    U R                  5       I S h  vN   U R                  R                  U R                  R	                  5       5      I S h  vN   g  N@ N7fr    )rG   r   await_unless_failedr   waitr+   s    r%   wait_until_emptyCommitterImpl.wait_until_empty   sC     kkm224;;3C3C3EFFF 	Fs!   AA:AAAArQ   returnNc                     U R                   R                  5       (       a  U R                   R                  5       eXl        g r    )r   rJ   r   )r$   rQ   s     r%   r9   CommitterImpl.commit   s2    !!##""((**%r(   rJ   c                 @   #    U R                  5       I S h  vN   g  N7fr    )r5   )r$   rJ   s     r%   stop_processingCommitterImpl.stop_processing   s       """s   
connectionc                 ~  #    UR                  [        U R                  S95      I S h  vN   UR                  5       I S h  vN nSU;  a$  U R                  R                  [        S5      5        U R                  c%  U R                  (       a  U R                  S   U l        / U l        U R                  5         g  N Nz7f)N)r   r   z;Received an invalid initial response on the publish stream.)
rT   r   r   rC   r   r:   r	   r   r   r1   )r$   re   r7   s      r%   reinitializeCommitterImpl.reinitialize   s      ;DMMRSSS#**H$!!"Q
 '(('+'@'@'D$$&! 	T*s"   &B=B9B= B;A9B=;B=)r   r   r   r   r   r   r   r   )!__name__
__module____qualname____firstlineno__r   __annotations__floatr   r   r   r   r   r   r!   Futurer"   r   r&   r*   r1   r5   r@   r/   r0   rK   rG   r]   r9   r
   rc   r   rh   __static_attributes__ r(   r%   r   r   *   s    )(#$&CC  f%%v,&''w~~&&MM+  #(*GG
	$B!)F $,
 
D;G&V & &
#+= #(*GG
r(   r   )r!   typingr   r   logging6google.cloud.pubsublite.internal.wait_ignore_cancelledr   /google.cloud.pubsublite.internal.wire.committerr   9google.cloud.pubsublite.internal.wire.retrying_connectionr   r   google.api_core.exceptionsr	   r
   >google.cloud.pubsublite.internal.wire.connection_reinitializerr   0google.cloud.pubsublite.internal.wire.connectionr   google.cloud.pubsublite_v1r    google.cloud.pubsublite_v1.typesr   r   r   	getLoggerrj   rU   r   rr   r(   r%   <module>r~      sk     !  U E N H -  

H
%F$&CCFr(   