
                             S SK r S SK Jr  S SKrS SKrS SKJr  S SKJr  S SKJ	r	  S SK
JrJr  S SKJr  S SKJrJrJrJr  S S	KJr  S S
KJr  SrSr " S S\\\4   \5      rg)    N)Future)	Cancelled)adapt_error)is_retryable)wait_ignore_errorswait_ignore_cancelled)ConnectionReinitializer)
ConnectionRequestResponseConnectionFactory)WorkItem)PermanentFailableg{Gz?
   c                   @  ^  \ rS rSr% Sr\\\4   \S'   \	\\4   \S'   \
R                  \S'   \
R                  \S'   S\S'   S	\S
'   S\\\4   S\	\\4   4U 4S jjrS rS rS\SS4S jrS\4S jrS rS\\\4   4S jr\S\\\4   S\\\4   4S j5       rSrU =r$ )RetryingConnection+   z_A connection which performs retries on an underlying stream when experiencing retryable errors._connection_factory_reinitializer_initialized_once
_loop_taskz&asyncio.Queue[WorkItem[Request, None]]_write_queuezasyncio.Queue[Response]_read_queueconnection_factoryreinitializerc                    > [         TU ]  5         Xl        X l        [        R
                  " 5       U l        [        R                  " SS9U l        [        R                  " SS9U l	        g )N   maxsize)
super__init__r   r   asyncioEventr   Queuer   r   )selfr   r   	__class__s      Llib/third_party/google/cloud/pubsublite/internal/wire/retrying_connection.pyr!   RetryingConnection.__init__7   sL    
 	#5 +!(#MM!4"==3    c                    #    [         R                  " U R                  5       5      U l        U R	                  U R
                  R                  5       5      I S h  vN   U $  N7fN)r"   ensure_future	_run_loopr   await_unless_failedr   waitr%   s    r'   
__aenter__RetryingConnection.__aenter__C   sL     !//0@A&&t'='='B'B'DEEE 	Fs   AA!AA!c                    #    U R                  [        S5      5        U R                  R                  5         [	        U R                  5      I S h  vN   g  N7f)NzConnection shutting down.)failr   r   cancelr   )r%   exc_typeexc_valexc_tbs       r'   	__aexit__RetryingConnection.__aexit__H   s9     		)789  111s   AAAArequestreturnNc                    #    [        U5      nU R                  U R                  R                  U5      5      I S h  vN   U R                  UR                  5      I S h  vN $  N' N7fr+   )r   r.   r   putresponse_future)r%   r;   items      r'   writeRetryingConnection.writeM   sW      &&t'8'8'<'<T'BCCC--d.B.BCCC 	DCs!   9A'A#"A'A%A'%A'c                 p   #    U R                  U R                  R                  5       5      I S h  vN $  N7fr+   )r.   r   getr0   s    r'   readRetryingConnection.readR   s+     --d.>.>.B.B.DEEEEs   -646c                 j  #     SnU R                  5       (       d   U R                  R                  5       nUI Sh  vN  ISh  vN nU R                  R	                  U5      I Sh  vN   U R
                  R                  5         SnU R                  U5      I Sh  vN   SSS5      ISh  vN   U R                  5       (       d  M  gg N N Nf N3 N%! , ISh  vN  (       d  f       N:= f! [         Ga  nU R                  5       (       a   SnAg[        U5      n[        R                  " S[        R                  " 5       5        [        U5      (       d  U R                  U5         SnAg U R                  R!                  U5      I Sh  vN    O5! [         a(  nU R                  [        U5      5         SnA SnAgSnAff = fU R"                  R%                  5       (       dk  U R"                  R'                  5       R(                  nUR+                  5       (       d  UR-                  U5        U R"                  R%                  5       (       d  Mk  [.        R0                  " SS9U l        [.        R0                  " SS9U l        [5        [.        R6                  " [9        [:        [<        SU-  -  5      5      5      I Sh  vN    US-  n SnAGNSnAff = f! [         aN  n[        R                   " S[        R                  " 5       5        U R                  [        U5      5         SnAgSnAff = f7f)zK
Processes actions on this connection and handles retries until cancelled.
r   Nz Saw a stream failure. Cause: 
%sr   r      z4Saw a stream failure which was unhandled. Cause: 
%s)errorr   newr   reinitializer   set_loop_connection	Exceptionr   loggingdebug	traceback
format_excr   r4   stop_processingr   empty
get_nowaitr?   	cancelledset_exceptionr"   r$   r   r   sleepmin_MAX_BACKOFF_SECS_MIN_BACKOFF_SECS)r%   bad_retriesconn_fut
connectione
stop_errorr?   s          r'   r-   RetryingConnection._run_loopU   sl    1	&Kjjll(%#77;;=H&.:"11>>&   ..224&'"33&   !/ jjll !/
 !/ ! %zz||#AAMM;Y=Q=Q=S (??		!"11AA!DDD$ 		+j"9: #//5577*.*;*;*F*F*H*X*X.88::+99!< #//5577 (/}}Q'?D$(/a(@D%/ 1 1Q^ D    1$K;%<  	&MMG$$& IIk!n%%	&sU  L3K C. C
C. CC. C'C(4CCC!C. ,C-C. 1K L3
C. C. CCC. C+CC+'C. *K +C. .K9KK L3AK)K -L3/FFFK
GG8K<K  L3GBKA,K?K 
K
K KK 
L0"AL+&L3+L00L3r^   c                   #    [         R                  " UR                  5       5      n[         R                  " U R                  R	                  5       5      n  [         R
                  " X2/[         R                  S9I S h  vN u  pEX4;   aO  U R                  XI S h  vN 5      I S h  vN   [         R                  " U R                  R	                  5       5      nX$;   aO  U R                  R                  UI S h  vN 5      I S h  vN   [         R                  " UR                  5       5      nM   N N N N< N2! UR                  5         UR                  5         [        U5      I S h  vN    [        U5      I S h  vN    f = f7f)N)return_when)r"   r,   rE   r   rD   r/   FIRST_COMPLETED_handle_writer   r>   r5   r   )r%   r^   	read_task
write_taskdone_s         r'   rM   #RetryingConnection._loop_connection   s2    (/(=(=joo>O(P	8?8M8M!!#9

	1 '+9P9P!  %,,Z9IJJJ!(!6!6t7H7H7L7L7N!OJ$**..Y??? ' 5 5joo6G HI  :JJ 0?? $Y///$Z000s   AF(D9 >D/?D9 D1
D9 &D3'AD9 8D59D9 D7+D9 1D9 3D9 5D9 7D9 9/F(E+)F<E?=FFto_writec                   #     U R                  UR                  5      I S h  vN   UR                  R                  S 5        g  N ! [         a-  n[        U5      nUR                  R                  U5        UeS nAff = f7fr+   )rA   r;   r?   
set_resultrN   r   rW   )r^   rk   r_   s      r'   re    RetryingConnection._handle_write   sn     	""8#3#3444$$//5 5 	AA$$2215G	s8   A?A AA A?A 
A<(A77A<<A?)r   r   r   r   r   r   )__name__
__module____qualname____firstlineno____doc__r   r   r   __annotations__r	   r"   r#   r   r!   r1   r9   rA   rE   r-   r
   rM   staticmethodr   re   __static_attributes____classcell__)r&   s   @r'   r   r   +   s    i*7H+<==+GX,=>>}}$::**
4-gx.?@
4 /w/@A
4
2
D7 Dt D
FH F5&n1GX<M1N 1, 	w01	=EgxFW=X	 	r)   r   )r"   r   rO   rQ   google.api_core.exceptionsr   8google.cloud.pubsublite.internal.wire.permanent_failabler   -google.cloud.pubsublite.internal.status_codesr   6google.cloud.pubsublite.internal.wait_ignore_cancelledr   r   >google.cloud.pubsublite.internal.wire.connection_reinitializerr	   0google.cloud.pubsublite.internal.wire.connectionr
   r   r   r   /google.cloud.pubsublite.internal.wire.work_itemr   r   r[   rZ   r    r)   r'   <module>r      s`        0 P F  E V  AGX$568I Ar)   