diff options
author | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2020-12-10 12:21:25 +0000 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2020-12-10 12:21:25 +0000 |
commit | ae3344b474b60e2bd93e8e0636e2f56b505f0f73 (patch) | |
tree | 302c3436b66cdb20ae8013bd2af537c67b036b0c | |
parent | 270439458a2c34486d1907aec53844cd29bfe2d6 (diff) | |
parent | 4c7656f1c53b1943896f70fb9a2e35e1f8a3fbd3 (diff) | |
download | buildstream-ae3344b474b60e2bd93e8e0636e2f56b505f0f73.tar.gz |
Merge branch 'tristan/refactor-retry-task' into 'master'
Refactor State object
See merge request BuildStream/buildstream!2115
-rw-r--r-- | src/buildstream/_artifactelement.py | 4 | ||||
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_messenger.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 15 | ||||
-rw-r--r-- | src/buildstream/_state.py | 361 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 40 |
6 files changed, 239 insertions, 185 deletions
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py index 63bb904fd..94503c4df 100644 --- a/src/buildstream/_artifactelement.py +++ b/src/buildstream/_artifactelement.py @@ -32,7 +32,7 @@ from .node import Node if TYPE_CHECKING: from ._context import Context - from ._state import _Task + from ._state import Task # ArtifactElement() @@ -81,7 +81,7 @@ class ArtifactElement(Element): # (ArtifactElement): A newly created Element instance # @classmethod - def new_from_artifact_name(cls, artifact_name: str, context: "Context", task: Optional["_Task"] = None): + def new_from_artifact_name(cls, artifact_name: str, context: "Context", task: Optional["Task"] = None): # Initial lookup for already loaded artifact. with suppress(KeyError): diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 2f4235733..993747041 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -732,7 +732,7 @@ class App: elif choice == "retry": click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - self.stream._failure_retry(task.id, unique_id) + self.stream.retry_job(task.action_name, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 69a309f91..f18d3dc92 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -212,7 +212,7 @@ class Messenger: self.message(message) task = self._state.add_task(task_name, activity_name, task_name) - task.set_render_cb(self._render_status) + task.set_task_changed_callback(self._render_status) self._active_simple_tasks += 1 if not self._next_render: self._next_render = datetime.datetime.now() + _RENDER_INTERVAL diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 7acb062d0..23abbe46d 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -113,7 +113,6 @@ class Scheduler: self._interrupt_callback = interrupt_callback self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) - self._state.register_task_retry_callback(self._failure_retry) # Ensure that the forkserver is started before we start. # This is best run before we do any GRPC connections to casd or have @@ -517,20 +516,6 @@ class Scheduler: self._ticker_callback() self.loop.call_later(1, self._tick) - def _failure_retry(self, task_id, unique_id): - task = self._state.tasks[task_id] - - queue = None - for q in self.queues: - if q.action_name == task.action_name: - queue = q - break - # Assert queue found, we should only be retrying a queued job - assert queue - element = Plugin._lookup(unique_id) - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) - def _handle_exception(self, loop, context: dict) -> None: e = context.get("exception") exc = bool(e) diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index 4a552b0b9..ddcbb09b9 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -16,7 +16,8 @@ # import datetime -from collections import OrderedDict +from typing import Optional, Tuple, List, Dict, Callable +from .types import _DisplayKey # TaskGroup @@ -24,23 +25,27 @@ from collections import OrderedDict # The state data stored for a group of tasks (usually scheduler queues) # # Args: -# name (str): The name of the Task Group, e.g. 'build' -# state (State): The state object -# complete_name (str): Optional name for frontend status rendering, e.g. 'built' +# name: The name of the Task Group, e.g. 'build' +# state: The state object +# complete_name: Optional name for frontend status rendering, e.g. 'built' # class TaskGroup: - def __init__(self, name, state, complete_name=None): - self.name = name - self.complete_name = complete_name - self.processed_tasks = 0 - self.skipped_tasks = 0 - # NOTE: failed_tasks is a list of strings instead of an integer count - # because the frontend requires the full list of failed tasks to - # know whether to print failure messages for a given element. - self.failed_tasks = [] - - self._state = state - self._update_task_group_cbs = [] + def __init__(self, name: str, state: "State", complete_name: Optional[str] = None) -> None: + + # + # Public members + # + self.name: str = name # The name of tasks in this group + self.complete_name: Optional[str] = complete_name # Optional name for frontend status rendering, e.g. 'built' + + self.processed_tasks: int = 0 # Number of processed tasks + self.skipped_tasks: int = 0 # Number of skipped tasks + self.failed_tasks: List[str] = [] # List of element full names which failed + + # + # Private members + # + self._state: "State" = state ########################################### # Core-facing APIs to drive notifications # @@ -52,7 +57,7 @@ class TaskGroup: # # This is a core-facing API and should not be called from the frontend # - def add_processed_task(self): + def add_processed_task(self) -> None: self.processed_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() @@ -63,7 +68,7 @@ class TaskGroup: # # This is a core-facing API and should not be called from the frontend # - def add_skipped_task(self): + def add_skipped_task(self) -> None: self.skipped_tasks += 1 for cb in self._state._task_groups_changed_cbs: @@ -74,19 +79,120 @@ class TaskGroup: # Update the TaskGroup's list of failed tasks and notify of changes # # Args: - # full_name (str): The full name of the task, distinguishing - # it from other tasks with the same action name - # e.g. an element's name. + # full_name: The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. # # This is a core-facing API and should not be called from the frontend # - def add_failed_task(self, full_name): + def add_failed_task(self, full_name: str) -> None: self.failed_tasks.append(full_name) for cb in self._state._task_groups_changed_cbs: cb() +# Task +# +# The state data stored for an individual task +# +# Args: +# state: The State object +# task_id: The unique identifier of the task +# action_name: The name of the action, e.g. 'build' +# full_name: The full name of the task, distinguishing +# it from other tasks with the same action name +# e.g. an element's name. +# elapsed_offset: The time the task started, relative to +# buildstream's start time. +class Task: + def __init__( + self, state: "State", task_id: str, action_name: str, full_name: str, elapsed_offset: datetime.timedelta + ) -> None: + + # + # Public members + # + self.id: str = task_id + self.action_name: str = action_name + self.full_name: str = full_name + self.elapsed_offset: datetime.timedelta = elapsed_offset + self.current_progress: Optional[int] = None + self.maximum_progress: Optional[int] = None + + # + # Private members + # + self._state: "State" = state + self._task_changed_cb: Optional[Callable[[], None]] = None # Callback to call when something could be rendered + + ############################################## + # Core-facing APIs for driving notifications # + ############################################## + + # set_task_changed_callback() + # + # Sets the callback to be called when this task has + # changed. + # + # This is just a convenience codepath for the Messenger object + # run simple tasks outside of the scheduler context, rather + # than connecting to the State callbacks which are there for the + # purpose of the frontend to get notifications of task progress. + # + # Args: + # callback: The callback to call when progress changed + # + def set_task_changed_callback(self, callback: Optional[Callable[[], None]]) -> None: + self._task_changed_cb = callback + + # set_maximum_progress() + # + # Sets the maximum progress possible for this task. + # + # Args: + # progress: The maximum progress possible for this task + # + def set_maximum_progress(self, progress: int) -> None: + self.maximum_progress = progress + self._notify_task_changed() + + # set_current_progress() + # + # Sets the current progress of the task, this should + # be a number between 0 and the maximum progress, if a + # maximum progress has been set. + # + # Args: + # progress: The current progress + # + def set_current_progress(self, progress: int) -> None: + self.current_progress = progress + self._notify_task_changed() + + # add_current_progress() + # + # A convenience function for incrementing the current + # progress of this task by 1. + # + def add_current_progress(self) -> None: + if self.current_progress is None: + new_progress = 1 + else: + new_progress = self.current_progress + 1 + self.set_current_progress(new_progress) + + ############################################## + # Private methods # + ############################################## + def _notify_task_changed(self) -> None: + for cb in self._state._task_changed_cbs: + cb(self.id) + + if self._task_changed_cb: + self._task_changed_cb() + + # State # # The state data that is stored for the purpose of sharing with the frontend. @@ -96,23 +202,26 @@ class TaskGroup: # when parts of State change, and read State to know what has changed. # # Args: -# session_start (datetime): The time the session started +# session_start: The time the session started # class State: - def __init__(self, session_start): - self._session_start = session_start - - self.task_groups = OrderedDict() # key is TaskGroup name - - # Note: A Task's full_name is technically unique, but only accidentally. - self.tasks = OrderedDict() # key is a tuple of action_name and full_name - - self._task_added_cbs = [] - self._task_removed_cbs = [] - self._task_changed_cbs = [] - self._task_groups_changed_cbs = [] - self._task_failed_cbs = [] - self._task_retry_cbs = [] + def __init__(self, session_start: datetime.datetime) -> None: + + # + # Public members + # + self.task_groups: Dict[str, TaskGroup] = {} # Dictionary of active task groups by group name + self.tasks: Dict[str, Task] = {} # Dictionary of active tasks by unique task ID + + # + # Private members + # + self._session_start: datetime.datetime = session_start + self._task_added_cbs: List[Callable[[str], None]] = [] + self._task_removed_cbs: List[Callable[[str], None]] = [] + self._task_changed_cbs: List[Callable[[str], None]] = [] + self._task_failed_cbs: List[Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]] = [] + self._task_groups_changed_cbs: List[Callable[[], None]] = [] ##################################### # Frontend-facing notification APIs # @@ -123,12 +232,12 @@ class State: # Registers a callback to be notified when a task is added # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_added_callback(self, callback): + def register_task_added_callback(self, callback: Callable[[str], None]) -> None: self._task_added_cbs.append(callback) # unregister_task_added_callback() @@ -137,9 +246,9 @@ class State: # register_task_added_callback() # # Args: - # callback (function): The callback to be removed + # callback: The callback to be removed # - def unregister_task_added_callback(self, callback): + def unregister_task_added_callback(self, callback: Callable[[str], None]) -> None: self._task_added_cbs.remove(callback) # register_task_removed_callback() @@ -147,12 +256,12 @@ class State: # Registers a callback to be notified when a task is removed # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_removed_callback(self, callback): + def register_task_removed_callback(self, callback: Callable[[str], None]) -> None: self._task_removed_cbs.append(callback) # unregister_task_removed_callback() @@ -161,9 +270,9 @@ class State: # register_task_removed_callback() # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # - def unregister_task_removed_callback(self, callback): + def unregister_task_removed_callback(self, callback: Callable[[str], None]) -> None: self._task_removed_cbs.remove(callback) # register_task_changed_callback() @@ -171,12 +280,12 @@ class State: # Register a callback to be notified when a task has changed # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def register_task_changed_callback(self, callback): + def register_task_changed_callback(self, callback: Callable[[str], None]) -> None: self._task_changed_cbs.append(callback) # unregister_task_changed_callback() @@ -185,9 +294,9 @@ class State: # register_task_changed_callback() # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # - def unregister_task_changed_callback(self, callback): + def unregister_task_changed_callback(self, callback: Callable[[str], None]) -> None: self._task_changed_cbs.remove(callback) # register_task_failed_callback() @@ -198,11 +307,12 @@ class State: # callback (function): The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task - # element (tuple): (optionally) The element unique_id and display keys if an - # element job + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and DisplayKey of an element job # - def register_task_failed_callback(self, callback): + def register_task_failed_callback( + self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None] + ) -> None: self._task_failed_cbs.append(callback) # unregister_task_failed_callback() @@ -213,22 +323,34 @@ class State: # Args: # callback (function): The callback to be removed # - def unregister_task_failed_callback(self, callback): + def unregister_task_failed_callback( + self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None] + ) -> None: self._task_failed_cbs.remove(callback) - # register_task_retry_callback() + # register_task_groups_changed_callback() # - # Registers a callback to be notified when a task is to be retried + # Registers a callback to be notified whenever the task groups info has changed # # Args: - # callback (function): The callback to be notified + # callback: The callback to be notified # # Callback Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and DisplayKey of an element job + # + def register_task_groups_changed_callback(self, callback: Callable[[], None]) -> None: + self._task_groups_changed_cbs.append(callback) + + # unregister_task_groups_changed_callback() + # + # Unregisters a callback previously registered by register_task_groups_changed_callback() + # + # Args: + # callback (function): The callback to be removed # - def register_task_retry_callback(self, callback): - self._task_retry_cbs.append(callback) + def unregister_task_groups_changed_callback(self, callback: Callable[[], None]) -> None: + self._task_groups_changed_cbs.remove(callback) ############################################## # Core-facing APIs for driving notifications # @@ -247,7 +369,7 @@ class State: # Returns: # TaskGroup: The task group created # - def add_task_group(self, name, complete_name=None): + def add_task_group(self, name, complete_name=None) -> TaskGroup: assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups) group = TaskGroup(name, self, complete_name) self.task_groups[name] = group @@ -263,7 +385,7 @@ class State: # Args: # name (str): The name of the task group, e.g. 'build' # - def remove_task_group(self, name): + def remove_task_group(self, name) -> None: # Rely on 'del' to raise an error when removing nonexistent task groups del self.task_groups[name] @@ -274,9 +396,9 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task - # action_name (str): The name of the action, e.g. 'build' - # full_name (str): The full name of the task, distinguishing + # task_id: The unique identifier of the task + # action_name: The name of the action, e.g. 'build' + # full_name: The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): (Optional) The time the task started, relative @@ -284,7 +406,12 @@ class State: # use this as they don't report relative to wallclock time # if the Scheduler has been suspended. # - def add_task(self, task_id, action_name, full_name, elapsed_offset=None): + # Returns: + # The new task + # + def add_task( + self, task_id: str, action_name: str, full_name: str, elapsed_offset: Optional[datetime.timedelta] = None + ) -> Task: assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format( action_name, full_name, task_id, self.tasks ) @@ -292,7 +419,7 @@ class State: if not elapsed_offset: elapsed_offset = self.elapsed_time() - task = _Task(self, task_id, action_name, full_name, elapsed_offset) + task = Task(self, task_id, action_name, full_name, elapsed_offset) self.tasks[task_id] = task for cb in self._task_added_cbs: @@ -307,9 +434,9 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task + # task_id: The unique identifier of the task # - def remove_task(self, task_id): + def remove_task(self, task_id: str) -> None: # Rely on 'del' to raise an error when removing nonexistent tasks del self.tasks[task_id] @@ -326,41 +453,27 @@ class State: # This is a core-facing API and should not be called from the frontend # # Args: - # task_id (str): The unique identifier of the task - # element (tuple): (optionally) The element unique_id and display keys if an + # task_id: The unique identifier of the task + # element: (optionally) The element unique_id and display keys if an # element job # - def fail_task(self, task_id, element=None): + def fail_task(self, task_id: str, element: Optional[Tuple[int, _DisplayKey]] = None) -> None: for cb in self._task_failed_cbs: cb(task_id, element) - # retry_task() - # - # Notify all registered callbacks that a task is to be retried. - # - # This is a core-facing API and should not be called from the frontend - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up - # - def retry_task(self, task_id: str, unique_id: str) -> None: - for cb in self._task_retry_cbs: - cb(task_id, unique_id) - # elapsed_time() # # Fetches the current session elapsed time # # Args: - # start_time(time): Optional explicit start time, relative to caller. + # start_time: Optional explicit start time, relative to caller. # # Returns: - # (timedelta): The amount of time since the start of the session, - # discounting any time spent while jobs were suspended if - # start_time given relative to the Scheduler + # The amount of time since the start of the session, + # discounting any time spent while jobs were suspended if + # start_time given relative to the Scheduler # - def elapsed_time(self, start_time=None): + def elapsed_time(self, start_time: Optional[datetime.datetime] = None) -> datetime.timedelta: time_now = datetime.datetime.now() if start_time is None: start_time = self._session_start or time_now @@ -379,61 +492,3 @@ class State: # def offset_start_time(self, offset: datetime.timedelta) -> None: self._session_start += offset - - -# _Task -# -# The state data stored for an individual task -# -# Args: -# state (State): The State object -# task_id (str): The unique identifier of the task -# action_name (str): The name of the action, e.g. 'build' -# full_name (str): The full name of the task, distinguishing -# it from other tasks with the same action name -# e.g. an element's name. -# elapsed_offset (timedelta): The time the task started, relative to -# buildstream's start time. -class _Task: - def __init__(self, state, task_id, action_name, full_name, elapsed_offset): - self._state = state - self.id = task_id - self.action_name = action_name - self.full_name = full_name - self.elapsed_offset = elapsed_offset - self.current_progress = None - self.maximum_progress = None - - self._render_cb = None # Callback to call when something could be rendered - - # set_render_cb() - # - # Sets the callback to be called when the Task has changed and should be rendered - # - # NOTE: This should probably be removed once the frontend is running - # separately from the scheduler, since renders could be triggered - # by the scheduler. - def set_render_cb(self, callback): - self._render_cb = callback - - def set_current_progress(self, progress): - self.current_progress = progress - for cb in self._state._task_changed_cbs: - cb(self.id) - if self._render_cb: - self._render_cb() - - def set_maximum_progress(self, progress): - self.maximum_progress = progress - for cb in self._state._task_changed_cbs: - cb(self.id) - - if self._render_cb: - self._render_cb() - - def add_current_progress(self): - if self.current_progress is None: - new_progress = 1 - else: - new_progress = self.current_progress + 1 - self.set_current_progress(new_progress) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index a5391562a..b50be2a0d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1069,6 +1069,33 @@ class Stream: self._suspended = False self._scheduler.resume() + # retry_job() + # + # Retry the indicated job + # + # Args: + # action_name: The unique identifier of the task + # unique_id: A unique_id to load an Element instance + # + def retry_job(self, action_name: str, unique_id: str) -> None: + element = Plugin._lookup(unique_id) + + # + # Update the state task group, remove the failed element + # + group = self._state.task_groups[action_name] + group.failed_tasks.remove(element._get_full_name()) + + # + # Find the queue for this action name and requeue the element + # + queue = None + for q in self.queues: + if q.action_name == action_name: + queue = q + assert queue + queue.enqueue([element]) + ############################################################# # Private Methods # ############################################################# @@ -1346,19 +1373,6 @@ class Stream: self.session_elements += plan - # _failure_retry() - # - # Enqueues given element via unique_id to the specified queue - # matched against provided action_name & removes the related - # failed task from the tasks group. - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: A unique_id to load an Element instance - # - def _failure_retry(self, task_id: str, unique_id: str) -> None: - self._state.retry_task(task_id, unique_id) - # _run() # # Common function for running the scheduler |