
    E/                         S SK Jr  S SKrS SKrS SKJrJrJr  S SKrS SK	J
r
  S SKJr  \R                  " \5      r\\
R                      r " S S5      r " S S	\5      rg)
    )OrderedDictN)DictOptionalType)types)
exceptionsc                   4    \ rS rSrSrS\S\S\4S jrS rSr	g	)
_QuantityReservation   z2A (partial) reservation of quantifiable resources.bytes_reservedbytes_neededhas_slotc                 (    Xl         X l        X0l        g )Nr   r   r   )selfr   r   r   s       Clib/third_party/google/cloud/pubsub_v1/publisher/flow_controller.py__init___QuantityReservation.__init__"   s    ,(     c                     [        U 5      R                   SU R                   SU R                   SU R                   S3$ )Nz(bytes_reserved=z, bytes_needed=z, has_slot=))type__name__r   r   r   )r   s    r   __repr___QuantityReservation.__repr__'   sN    Dz""# $"112 3 --. /a)	
r   )r   r   r   N)
r   
__module____qualname____firstlineno____doc__intboolr   r   __static_attributes__ r   r   r
   r
      s$    <!s !# ! !

r   r
   c                       \ rS rSrSrS\R                  4S jrS\SS4S jr	S\SS4S	 jr
SS
 jrS\4S jrS\S\4S jr SS\\   S\\   S\4S jjrSrg)FlowController0   zzA class used to control the flow of messages passing through it.

Args:
    settings: Desired flow control configuration.
settingsc                     Xl         SU l        SU l        [        5       U l        SU l        SU l        [        R                  " 5       U l	        [        R                  " U R                  S9U l        g )Nr   )lock)	_settings_message_count_total_bytesr   _waiting_reserved_bytes_reserved_slots	threadingLock_operational_lock	Condition_has_capacity)r   r'   s     r   r   FlowController.__init__7   sb    !  
 GRm   "+!1 '00d6L6LMr   messagereturnNc                 p   U R                   R                  [        R                  R                  :X  a  gU R
                     U R                  U5      (       dL  U =R                  S-  sl        U =R                  UR                  R                  5       -  sl         SSS5        gU R                   R                  [        R                  R                  :X  ai  U R                  U R                  S-   U R                  UR                  R                  5       -   S9nSR                  U5      n[        R                  " U5      eU R                   R                  [        R                  R                   :X  d   eUR                  R                  5       U R                   R"                  :  d  U R                   R$                  S:  aO  U R                  SUR                  R                  5       S9nSR                  U5      n[        R                  " U5      e[&        R(                  " 5       nU R                  U5      (       a  X@R*                  ;  a1  [-        SUR                  R                  5       SS9nXPR*                  U'   [.        R1                  S	R                  U R                  5       5      5        U R2                  R5                  5         [.        R1                  S
R                  U R                  5       5      5        U R                  U5      (       a  M  U =R                  S-  sl        U =R                  UR                  R                  5       -  sl        U =R6                  U R*                  U   R8                  -  sl        U =R:                  S-  sl        U R*                  U	 SSS5        g! , (       d  f       g= f)a  Add a message to flow control.

Adding a message updates the internal load statistics, and an action is
taken if these limits are exceeded (depending on the flow control settings).

Args:
    message:
        The message entering the flow control.

Raises:
    :exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
        Raised when the desired action is
        :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR` and
        the message would exceed flow control limits, or when the desired action
        is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.BLOCK` and
        the message would block forever against the flow control limits.
N   )message_counttotal_bytesz+Flow control limits would be exceeded - {}.zLTotal flow control limits too low for the message, would block forever - {}.r   Fr   z>Blocking until there is enough free capacity in the flow - {}.z7Woke up from waiting on free capacity in the flow - {}.)r*   limit_exceeded_behaviorr   LimitExceededBehaviorIGNOREr2   _would_overflowr+   r,   _pbByteSizeERROR
_load_infoformatr   FlowControlLimitErrorBLOCK
byte_limitmessage_limitr0   current_threadr-   r
   _LOGGERdebugr4   waitr.   r   r/   )r   r6   	load_info	error_msgrI   reservations         r   addFlowController.addN   s   $ >>11U5P5P5W5WW##''00##q(#!!W[[%9%9%;;!	 $# 66..445 !OO"&"5"5"9 $ 1 1GKK4H4H4J J , 	 JPP	 !66yAA 66..4455 $$&)B)BB>>//!3 OO"#1E1E1G , 	006y0A  !66yAA&557N&&w//!6"6'(%,[[%9%9%;!&#K
 5@MM.1 &!23
 ""'') &!23! &&w//, 1$!5!5!77  DMM.$A$P$PP   A% n-O $##s    AN'#I)N'BN''
N5c                    U R                   R                  [        R                  R                  :X  a  gU R
                     U =R                  S-  sl        U =R                  UR                  R                  5       -  sl        U R                  S:  d  U R                  S:  aP  [        R                  " S[        SS9  [        SU R                  5      U l        [        SU R                  5      U l        U R                  5         U R                  5       (       a/  [         R#                  S5        U R$                  R'                  5         SSS5        g! , (       d  f       g= f)zgRelease a mesage from flow control.

Args:
    message:
        The message entering the flow control.
Nr9   r   z=Releasing a message that was never added or already released.   )category
stacklevelz2Notifying threads waiting to add messages to flow.)r*   r<   r   r=   r>   r2   r+   r,   r@   rA   warningswarnRuntimeWarningmax_distribute_available_capacity_ready_to_unblockrJ   rK   r4   
notify_all)r   r6   s     r   releaseFlowController.release   s    >>11U5P5P5W5WW##1$!5!5!77""Q&$*;*;a*?S+ 
 '*!T-@-@&A#$'4+<+<$=!//1 %%''RS""--/' $##s    DE
Ec                    U R                   R                  U R                  -
  U R                  -
  nU R                   R                  U R
                  -
  U R                  -
  nU R                  R                  5        H  nUS::  a  US::  a    gUS:  a2  UR                  (       d!  SUl	        U =R                  S-  sl        US-  nUS::  a  MQ  UR                  UR                  -
  nUS:  aA  SR                  UR                  UR                  5      n[        R                  " U[        S9  Sn[!        XB5      nU=R                  U-  sl        U =R                  U-  sl        X&-  nM     g)zDistribute available capacity among the waiting threads in FIFO order.

The method assumes that the caller has obtained ``_operational_lock``.
r   Tr9   z Too many bytes reserved: {} / {})rT   N)r*   rH   r+   r/   rG   r,   r.   r-   valuesr   r   r   rD   rV   rW   rX   min)r   available_slotsavailable_bytesrO   bytes_still_neededmsgcan_gives          r   rZ   -FlowController._distribute_available_capacity   sN    NN((4+>+>>AUAUU 	 NN%%(9(99D<P<PP 	  ==//1K!#1(< ";+?+?'+$$$)$1$ !#!,!9!9K<V<V!V!A%8??..0H0H cN;%&"-?H&&(2&  H, 'O5 2r   c                     U R                   (       aX  [        [        U R                   R                  5       5      5      nUR                  UR
                  :  =(       a    UR                  $ g)zDetermine if any of the threads waiting to add a message can proceed.

The method assumes that the caller has obtained ``_operational_lock``.
F)r-   nextiterr`   r   r   r   )r   first_reservations     r   r[    FlowController._ready_to_unblock   sW    
 == !%T$--*>*>*@%A B!004E4R4RR /%..
 r   c                    U R                   R                  [        R                  " 5       5      nU(       a&  UR                  UR
                  :  nUR                  nOSnSnU R                  U R                  -   UR                  R                  5       -   nXPR                  R                  :  =(       a    U(       + nU(       + =(       a3    U R                  U R                  -   S-   U R                  R                  :  nU=(       d    U$ )zDetermine if accepting a message would exceed flow control limits.

The method assumes that the caller has obtained ``_operational_lock``.

Args:
    message: The message entering the flow control.
Fr9   )r-   getr0   rI   r   r   r   r,   r.   r@   rA   r*   rG   r+   r/   rH   )r   r6   rO   enough_reservedr   bytes_takensize_overflowmsg_count_overflows           r   r?   FlowController._would_overflow  s     mm''	(@(@(BC)88K<T<TTO"++H#OH''$*>*>>AUAUAWW#nn&?&??WDW!)\ 
  4#7#77!;nn**+ 	
 2 22r   r:   r;   c                     Uc  U R                   nUc  U R                  nSU SU R                  R                   SU R                   SU SU R                  R
                   SU R                   S3$ )ap  Return the current flow control load information.

The caller can optionally adjust some of the values to fit its reporting
needs.

The method assumes that the caller has obtained ``_operational_lock``.

Args:
    message_count:
        The value to override the current message count with.
    total_bytes:
        The value to override the current total bytes with.
z
messages: z / z (reserved: z
), bytes: r   )r+   r,   r*   rH   r/   rG   r.   )r   r:   r;   s      r   rC   FlowController._load_info  s        //M++K s4>>+G+G*H I../ 0!]#dnn&?&?%@ A../q2	
r   )r4   r+   r2   r.   r/   r*   r,   r-   )r7   N)NN)r   r   r   r   r   r   PublishFlowControlr   MessageTyperP   r]   rZ   r!   r[   r?   r   r    strrC   r"   r#   r   r   r%   r%   0   s    N!9!9 N.\.; \.4 \.|0{ 0t 0>&(P4  3{ 3t 38 QU
%c]
@H
	
 
r   r%   )collectionsr   loggingr0   typingr   r   r   rV   google.cloud.pubsub_v1r    google.cloud.pubsub_v1.publisherr   	getLoggerr   rJ   PubsubMessagerw   r
   objectr%   r#   r   r   <module>r      s[    $   ' '  ( 7 

H
% 5&&'
 
"I
V I
r   