
    g$                         S r SSKJr  SSKJr  SSKJr  SSKrSSKrSSKrSSKJ	r
  SSKJr  SSKJr  S	r " S
 S\5      r " S S\5      r " S S\5      rSS jrg)a  Logic for streaming logs.

We implement streaming with two important implementation details.  First,
we use polling because Cloud Logging does not support streaming. Second, we
have no guarantee that we will receive logs in chronological order.
This is because clients can emit logs with chosen timestamps.  However,
we want to generate an ordered list of logs.  So, we choose to not fetch logs
in the most recent N seconds.  We also decided to skip logs that are returned
too late (their timestamp is more than N seconds old).
    )absolute_import)division)unicode_literalsN)common)log)timesz1970-01-01T01:00:00.000000000Zc                   4    \ rS rSrSrS	S jrS rS rS rSr	g)
LogPosition+   zTracks a position in the log.

Log messages are sorted by timestamp.  Within a given timestamp, logs will be
returned in order of insert_id.
Nc                 H    U=(       d    [         U l        SU l        SU l        g )N F)_UNIX_ZERO_TIMESTAMP	timestamp	insert_idneed_insert_id_in_lb_filter)selfr   s     -lib/googlecloudsdk/command_lib/logs/stream.py__init__LogPosition.__init__2   s    6"6DNDN',D$    c                     XR                   :  a  gXR                   :X  a  X R                  :  a  X l        SU l        ggSU l        X l        Xl         g)a  Update the log position based on new log entry data.

Args:
    timestamp: the timestamp of the message we just read, as an RFC3339
               string.
    insert_id: the insert id of the message we just read.

Returns:
    True if the position was updated; False if not.
FT)r   r   r   )r   r   r   s      r   UpdateLogPosition.Update7   sO     >>!	nn	$	^^	#"+/( */d& n nr   c                     U R                   (       a1  SR                  U R                  U R                  U R                  5      $ SR                  U R                  5      $ )zThe log message filter which keeps out messages which are too old.

Returns:
    The lower bound filter text that we should use.
z9((timestamp="{0}" AND insertId>"{1}") OR timestamp>"{2}")ztimestamp>="{0}")r   formatr   r   )r   s    r   GetFilterLowerBoundLogPosition.GetFilterLowerBoundT   sJ     ''HOO
..$..$..: :  &&t~~66r   c                     [         R                  " U R                  5      R                  nUR	                  US9nU[
        R                  " SS9-
  nSR                  [         R                  " US5      5      $ )zThe log message filter which keeps out messages which are too new.

Args:
    now: The current time, as a datetime object.

Returns:
    The upper bound filter text that we should use.
)tzinfo   )secondsztimestamp<"{0}"z%Y-%m-%dT%H:%M:%S.%6f%Ez)	r   ParseDateTimer   r   replacedatetime	timedeltar   FormatDateTime)r   nowr   upper_bounds       r   GetFilterUpperBoundLogPosition.GetFilterUpperBounda   si       077F
++V+
$C**155K##[*DEG Gr   )r   r   r   N)
__name__
__module____qualname____firstlineno____doc__r   r   r   r)   __static_attributes__ r   r   r
   r
   +   s    -
:7Gr   r
   c                   $    \ rS rSrSrS rS rSrg)_TaskIntervalTimerr   a  Timer to facilitate performing multiple tasks at different intervals.

Here's an overview of how the caller sees this class:

>>> timer = _TaskIntervalTimer({'a': 5, 'b': 10, 'c': 3})
>>> timer.Wait()  # sleeps 3 seconds, total time elapsed 3
['c']
>>> timer.Wait()  # sleeps 2 seconds, total time elapsed 5
['a']
>>> timer.Wait()  # sleeps 1 second,  total time elapsed 6
['c']
>>> timer.Wait()  # sleeps 3 seconds, total time elapsed 9
['c']
>>> timer.Wait()  # sleeps 1 second,  total time elapsed 10
['a', 'c']

And here's how it might be used in practice:

>>> timer = _TaskIntervalTimer({'foo': 1, 'bar': 10, 'baz': 3})
>>> while True:
...   tasks = timer.Wait()
...   if 'foo' in tasks:
...     foo()
...   if 'bar' in tasks:
...     bar()
...   if 'baz' in tasks:
...     do_baz()


Attributes:
  task_intervals: dict (hashable to int), mapping from some representation of
    a task to to the interval (in seconds) at which the task should be
    performed
c                 N    Xl         U R                   R                  5       U l        g r+   )task_intervalscopy_time_remaining)r   r7   s     r   r   _TaskIntervalTimer.__init__   s    (..335Dr   c                 d   [        U R                  R                  5       5      n[        R                  " U5        [        5       nU R                   H\  nU R                  U==   U-  ss'   U R                  U   S:X  d  M/  U R                  U   U R                  U'   UR                  U5        M^     U$ )z`Wait for the next task(s) and return them.

Returns:
  set, the tasks which should be performed
r   )minr9   valuestimesleepsetr7   add)r   
sleep_timetaskstasks       r   Wait_TaskIntervalTimer.Wait   s     T))0023JJJzEE$$
4 J. 			d	#q	(%)%8%8%>T"		$	 %
 Lr   )r9   r7   N)r,   r-   r.   r/   r0   r   rE   r1   r2   r   r   r4   r4   r   s    !F6r   r4   c                   h    \ rS rSrSr " S S\R                  5      rSrSSS SS4S	 jr	S
 r
S rSrg)
LogFetcher   zA class which fetches job logs.c                       \ rS rSrSrSrSrg)LogFetcher._Tasks         r2   N)r,   r-   r.   r/   POLLCHECK_CONTINUEr1   r2   r   r   _TasksrK      s    DNr   rQ   i  N
   c                     g)NTr2   )xs    r   <lambda>LogFetcher.<lambda>   s    tr   c                     U=(       d    / U l         X l        U=(       d    UU l        X0l        [	        X5      n[
        R                  " SR                  U5      5        [        US9U l	        g)a  Initializes the LogFetcher.

Args:
  filters: list of string filters used in the API call.
  polling_interval: amount of time to sleep between each poll.
  continue_func: One-arg function that takes in the number of empty polls
    and outputs a boolean to decide if we should keep polling or not. If not
    given, keep polling indefinitely.
  continue_interval: int, how often to check whether the job is complete
    using continue_function. If not provided, defaults to the same as the
    polling interval.
  num_prev_entries: int, if provided, will first perform a decending
    query to set a lower bound timestamp equal to that of the n:th entry.
zstart timestamp: {})r   N)
base_filterspolling_intervalcontinue_intervalshould_continue_GetTailStartingTimestampr   debugr   r
   log_position)r   filtersrY   continue_funcrZ   num_prev_entriesstart_timestamps          r   r   LogFetcher.__init__   sY    "  2D,.B2BD(/JOII#**?;<#o>Dr   c                    [         R                   R                  5       nU R                  R                  5       nU R                  R	                  U5      nU R
                  X#/-   n[        R                  " SR                  U5      SU R                  S9nU Vs/ s H<  nU R                  R                  UR                  UR                  5      (       d  M:  UPM>     sn$ s  snf )zRetrieves a batch of logs.

After we fetch the logs, we ensure that none of the logs have been seen
before.  Along the way, we update the most recent timestamp.

Returns:
  A list of valid log entries.
 AND ASC
log_filterorder_bylimit)r$   utcnowr^   r   r)   rX   logging_common	FetchLogsjoinLOG_BATCH_SIZEr   r   insertId)r   rk   lower_filterupper_filter
new_filterentriesentrys          r   GetLogsLogFetcher.GetLogs   s     %%'F$$88:L$$88@L""l%AAJ&&<<
+!!#G  ' Gwe$$U__ennE w G G Gs   9CCc              #     #    [        U R                  R                  U R                  U R                  R                  U R
                  05      nSnU R                  R                  U R                  R                  /n U R                  R                  U;   a,  U R                  5       nU(       a  SnU H  nUv   M	     OUS-  nU R                  R                  U;   a  U R                  U5      nU(       d  gUR                  5       nM  7f)zPolls Get API for more logs.

We poll so long as our continue function, which considers the number of
periods without new logs, returns True.

Yields:
    A single log entry.
r   rM   N)	r4   rQ   rO   rY   rP   rZ   rv   r[   rE   )r   timerempty_pollsrC   logs	log_entryr[   s          r   	YieldLogsLogFetcher.YieldLogs   s      $//""D$:$:  E K[[t{{99:E
			U	"||~+iO   
+		#	#u	,..{;
jjle s   DD)rX   rZ   r^   rY   r[   )r,   r-   r.   r/   r0   enumEnumrQ   ro   r   rv   r}   r1   r2   r   r   rH   rH      s9    'tyy  .!B+t $?2G(r   rH   c                     U(       d  g[        [        R                  " SR                  U 5      SUS95      n[	        U5      U:  a  g[        U5      S   R
                  $ )a|  Returns the starting timestamp to start streaming logs from.

Args:
  filters: [str], existing filters, should not contain timestamp constraints.
  offset: int, how many entries ago we should pick the starting timestamp.
    If not provided, unix time zero will be returned.

Returns:
  str, A timestamp that can be used as lower bound or None if no lower bound
    is necessary.
Nre   DESCrg   )listrl   rm   rn   lenr   )r_   offsetrt   s      r   r\   r\     sX     
))W\\'5J39068 9' 	\F	gr		$	$$r   r+   )r0   
__future__r   r   r   r$   r>   r   googlecloudsdk.api_lib.loggingr   rl   googlecloudsdk.corer   googlecloudsdk.core.utilr   r   objectr
   r4   rH   r\   r2   r   r   <module>r      sa   	 '  '    C # * 8 DG& DGN7 7tS Sl%r   