
    6                         S r SSKJr  SSKJr  SSKJr  SSKrSSKJr  SSKJ	r	  SSK
Jr  S	rS
rSrSr " S S5      r " S S\	R"                  5      r " S S5      rg)zImplements logic for tracking task dependencies in task_graph_executor.

See go/parallel-processing-in-gcloud-storage for more information.
    )absolute_import)division)unicode_literalsN)List)errors)log   zTask Graph:z   - Task ID: {}
z^    - Task: {}
    - Dependency Count: {}
    - Dependent Task IDs: {}
    - Is Submitted: {}
c                   $    \ rS rSrSrS rS rSrg)TaskWrapper*   a  Embeds a Task instance in a dependency graph.

Attributes:
  id (Hashable): A unique identifier for this task wrapper.
  task (googlecloudsdk.command_lib.storage.tasks.task.Task): An instance of a
    task class.
  dependency_count (int): The number of unexecuted dependencies this task has,
    i.e. this node's in-degree in a graph where an edge from A to B indicates
    that A must be executed before B.
  dependent_task_ids (Optional[Iterable[Hashable]]): The id of the tasks that
    require this task to be completed for their own completion. This value
    should be None if no tasks depend on this one.
  is_submitted (bool): True if this task has been submitted for execution.
c                 D    Xl         X l        SU l        X0l        SU l        g )Nr   F)idtaskdependency_countdependent_task_idsis_submitted)selftask_idr   r   s       :lib/googlecloudsdk/command_lib/storage/tasks/task_graph.py__init__TaskWrapper.__init__:   s"    GID0D    c                 "   [         R                  U R                  5      [        R                  U R                  R
                  R                  U R                  (       a  [        U R                  5      OSU R                  U R                  5      -   $ )z3Returns a string representation of the TaskWrapper.r   )
TASK_WRAPPER_IDformatr   TASK_DETAILSr   	__class____name__r   lenr   )r   s    r   __str__TaskWrapper.__str__A   so     	tww'II((&& ''(,-##	
	
	r   )r   r   r   r   r   N)r   
__module____qualname____firstlineno____doc__r   r    __static_attributes__ r   r   r   r   *   s    r   r   c                       \ rS rSrSrSrg)InvalidDependencyErrorO   zRaised on attempts to create an invalid dependency.

Invalid dependencies are self-dependencies and those that involve nodes that
do not exist.
r'   N)r   r"   r#   r$   r%   r&   r'   r   r   r)   r)   O   s    r   r)   c                   N    \ rS rSrSrS rSS jrS rS rS r	S	\
\   4S
 jrSrg)	TaskGraphW   a)  Tracks dependencies between Task instances.

See googlecloudsdk.command_lib.storage.tasks.task.Task for the definition of
the Task class.

The public methods in this class are thread safe.

Attributes:
  is_empty (threading.Event): is_empty.is_set() is True when the graph has no
    tasks in it.
c                     [         R                  " 5       U l        U R                  R                  5         [         R                  " 5       U l        0 U l        [         R                  " U5      U l        g)a;  Initializes a TaskGraph instance.

Args:
  top_level_task_limit (int): A top-level task is a task that no other tasks
    depend on for completion (i.e. dependent_task_ids is None). Adding
    top-level tasks with TaskGraph.add will block until there are fewer than
    this number of top-level tasks in the graph.
N)		threadingEventis_emptysetRLock_lock_task_wrappers_in_graph	Semaphore_top_level_task_semaphore)r   top_level_task_limits     r   r   TaskGraph.__init__d   sR     OO%DMMM "DJ $&D 
 &/%8%89M%ND"r   Nc                    USL nU(       a  U R                   R                  5         U R                     UR                  b  UR                  nO[	        U5      nX@R
                  ;   a  UR                  bN  [        R                  R                  SR                  UR                  R                  UR                  5      5        OB[        R                  R                  SR                  UR                  R                  5      5        U(       a  U R                   R                  5          SSS5        g[        XAU5      nU=(       d    /  H&  n U R
                  U   =R                  S-  sl        M(     XPR
                  UR                  '   U R"                  R%                  5         SSS5        U$ ! [         a    [         ef = f! , (       d  f       W$ = f)a  Adds a task to the graph.

Args:
  task (googlecloudsdk.command_lib.storage.tasks.task.Task): The task to be
    added.
  dependent_task_ids (Optional[List[Hashable]]): TaskWrapper.id attributes
    for tasks already in the graph that require the task being added to
    complete before being executed. This argument should be None for
    top-level tasks, which no other tasks depend on.

Returns:
  A TaskWrapper instance for the task passed into this function, or None if
  task.parallel_processing_key was the same as another task's
  parallel_processing_key.

Raises:
  InvalidDependencyError if any id in dependent_task_ids is not in the
  graph, or if a the add operation would have created a self-dependency.
NzcSkipping {} for {}. This can occur if a cp command results in multiple writes to the same resource.zoSkipping {}. This is probably because due to a bug that caused it to be submitted for execution more than once.   )r7   acquirer4   parallel_processing_keyr   r5   r   statusPrintr   r   r   releaser   r   KeyErrorr)   r1   clear)r   r   r   is_top_level_task
identifiertask_wrapperr   s          r   addTaskGraph.add~   s   ( +d2
$$,,.			%	%	111
X
	33	3''3
**

66<f..))4+G+G7IJ
 **

HHN..))I+,
 

(
(
0
0
2) 
, !3EFl'-2-'	'

&
&w
/
@
@A
E
@ . 7C""<??3
mm? 
@   	'&
&	'7 
@ s*   C4G /G 
"F,,6G ,F==G  
Gc                 F   U R                      UR                  (       a  / sSSS5        $ UR                  (       d  U/sSSS5        $ U R                  UR                  	 UR
                  cP  U R                  R                  5         U R                  (       d  U R                  R                  5         / sSSS5        $ / nUR
                   H:  nU R                  U   nU=R                  S-  sl        X R                  U5      -  nM<     UsSSS5        $ ! , (       d  f       g= f)a  Recursively removes a task and its parents from the graph if possible.

Tasks can be removed only if they have been submitted for execution and have
no dependencies. Removing a task can affect dependent tasks in one of two
ways, if the removal left the dependent tasks with no dependencies:
 - If the dependent task has already been submitted, it can also be removed.
 - If the dependent task has not already been submitted, it can be
   submitted for execution.

This method removes all tasks that removing task_wrapper allows, and returns
all tasks that can be submitted after removing task_wrapper.

Args:
  task_wrapper (TaskWrapper): The task_wrapper instance to remove.

Returns:
  An Iterable[TaskWrapper] that yields tasks that are submittable after
  completing task_wrapper.
Nr;   )r4   r   r   r5   r   r   r7   r@   r1   r2   complete)r   rE   submittable_tasksr   dependent_task_wrappers        r   rI   TaskGraph.complete   s    ( 
		&	&  
 && ~ 
 
&
&|
7		(	(	0&&..0++
--


+ 
4 !44'!%!=!=g!F//14/ 	]]+ABB 5 C 
s   DDA*D;AD
D c                    U R                      Ubk  UR                  b^  UR                  bQ  UR                   HA  nU R                  U   nUR                  R
                  R                  UR                  5        MC     Ub  UR                  (       d  U R                  U5      sSSS5        $ U/n[        UR                  5       HN  nU Vs/ s H  owR                  PM     nn/ nU H(  n	U R                  XS9nUc  M  UR                  U5        M*     MP     U(       d  U R                  U5        UsSSS5        $ s  snf ! , (       d  f       g= f)aA  Updates the graph based on the output of an executed task.

If some googlecloudsdk.command_lib.storage.task.Task instance `a` returns
the following iterables of tasks: [[b, c], [d, e]], we need to update the
graph as follows to ensure they are executed appropriately.

       /-- d <-\--/- b
  a <-/         \/
      \         /\
       \-- e <-/--\- c

After making these updates, `b` and `c` are ready for submission. If a task
does not return any new tasks, then it will be removed from the graph,
potentially freeing up tasks that depend on it for execution.

See go/parallel-processing-in-gcloud-storage#heading=h.y4o7a9hcs89r for a
more thorough description of the updates this method performs.

Args:
  executed_task_wrapper (task_graph.TaskWrapper): Contains information about
    how a completed task fits into a dependency graph.
  task_output (Optional[task.Output]): Additional tasks and
    messages returned by the task in executed_task_wrapper.

Returns:
  An Iterable[task_graph.TaskWrapper] containing tasks that are ready to be
  executed after performing graph updates.
N)r   )r4   messagesr   r5   r   received_messagesextendadditional_task_iteratorsrI   reversedr   rF   append)
r   executed_task_wrappertask_outputr   rK   parent_tasks_for_next_layertask_iteratorrE   r   r   s
             r   update_from_executed_task#TaskGraph.update_from_executed_task   s6   : 


!"".#66B,??G#'#?#?#H
 
 
%
%
7
7
>
>""$ @
 
	K$I$I }}23 
 &;$;! $K$I$IJ-0K
0KOO0K 	 
 ')#!D$N,%'..|< " K )+,(C 
(
) 
s*   BD<*D<	D7D<;2D<7D<<
E
c                 n   [         SU R                  R                  5        3S[        U R                  5       3/nU R                  (       aI  [        5       nUR                  U R                  U R                  R                  5       [        U5      5        OUR                  S5        SR                  U5      $ )z1Returns a string representation of the TaskGraph.z
 - Empty: z - Task Wrappers: zNo tasks in the graph to print.
)TASK_GRAPH_HEADERr1   is_setr   r5   r2   rP   _print_task_wrapper_recursivevaluesINITIAL_INDENT_LEVELrS   join)r   outputprinted_taskss      r   r    TaskGraph.__str__/  s     	
T]]))+,-
S!=!=>?@F
 ##emmm

,
,**113" mm5699Vr   returnc              #   R  #    U H  nUR                   U;  d  M  UR                  UR                   5        [        U5      v   UR                  (       d  MP  UR                   Vs/ s H  nU R                  U   PM     nnU R                  XbS-   U5       Sh  vN   M     gs  snf  N7f)a"  Recursively yields task wrappers and their dependencies.

Example:
  Suppose we have task wrappers representing tasks with dependencies:

  task_wrapper1 = TaskWrapper(id='task1',
  dependent_task_ids=['task2', 'task3']),
  task_wrapper2 = TaskWrapper(id='task2', dependent_task_ids=['task4'])
  task_wrapper3 = TaskWrapper(id='task3', dependent_task_ids=[])
  task_wrapper4 = TaskWrapper(id='task4', dependent_task_ids=[])

  task_wrappers = [task_wrapper1, task_wrapper2,
                   task_wrapper3, task_wrapper4]

  Calling _print_task_wrapper_recursive(task_wrappers, 0, set())
  would produce:

  ['task1',
    '  task2',
    '    task4',
    '  task3']

  This shows the tasks and their dependencies formatted with appropriate
  indentation levels.

Args:
  task_wrappers (list): List of task wrappers to print.
  indent_level (int): Current level of indentation for formatting.
  printed_tasks (set): Set of task IDs that have already been printed.


Yields:
  List of formatted strings representing the task wrappers
  and their dependencies.
r	   N)r   rF   strr   r5   r^   )r   task_wrappersindent_levelrc   rE   r   dependent_task_wrapperss          r   r^   'TaskGraph._print_task_wrapper_recursiveC  s     N &		-,//*,*** *<<%<' **73< " % 77%a'7H H H &
%Hs(   B'7B'B'#B <B'B%B')r4   r5   r7   r1   )N)r   r"   r#   r$   r%   r   rF   rI   rX   r    r   rg   r^   r&   r'   r   r   r,   r,   W   s7    
O48t5n>)@(1HCy1Hr   r,   )r%   
__future__r   r   r   r/   typingr   "googlecloudsdk.command_lib.storager   googlecloudsdk.corer   r`   r\   r   r   r   Errorr)   r,   r'   r   r   <module>rq      sg   
 '  '   5 #  ! & " "JV\\ ]H ]Hr   