summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.van.berkom@gmail.com>2020-12-10 12:21:25 +0000
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2020-12-10 12:21:25 +0000
commitae3344b474b60e2bd93e8e0636e2f56b505f0f73 (patch)
tree302c3436b66cdb20ae8013bd2af537c67b036b0c
parent270439458a2c34486d1907aec53844cd29bfe2d6 (diff)
parent4c7656f1c53b1943896f70fb9a2e35e1f8a3fbd3 (diff)
downloadbuildstream-ae3344b474b60e2bd93e8e0636e2f56b505f0f73.tar.gz
Merge branch 'tristan/refactor-retry-task' into 'master'
Refactor State object See merge request BuildStream/buildstream!2115
-rw-r--r--src/buildstream/_artifactelement.py4
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_messenger.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py15
-rw-r--r--src/buildstream/_state.py361
-rw-r--r--src/buildstream/_stream.py40
6 files changed, 239 insertions, 185 deletions
diff --git a/src/buildstream/_artifactelement.py b/src/buildstream/_artifactelement.py
index 63bb904fd..94503c4df 100644
--- a/src/buildstream/_artifactelement.py
+++ b/src/buildstream/_artifactelement.py
@@ -32,7 +32,7 @@ from .node import Node
if TYPE_CHECKING:
from ._context import Context
- from ._state import _Task
+ from ._state import Task
# ArtifactElement()
@@ -81,7 +81,7 @@ class ArtifactElement(Element):
# (ArtifactElement): A newly created Element instance
#
@classmethod
- def new_from_artifact_name(cls, artifact_name: str, context: "Context", task: Optional["_Task"] = None):
+ def new_from_artifact_name(cls, artifact_name: str, context: "Context", task: Optional["Task"] = None):
# Initial lookup for already loaded artifact.
with suppress(KeyError):
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 2f4235733..993747041 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -732,7 +732,7 @@ class App:
elif choice == "retry":
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- self.stream._failure_retry(task.id, unique_id)
+ self.stream.retry_job(task.action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 69a309f91..f18d3dc92 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -212,7 +212,7 @@ class Messenger:
self.message(message)
task = self._state.add_task(task_name, activity_name, task_name)
- task.set_render_cb(self._render_status)
+ task.set_task_changed_callback(self._render_status)
self._active_simple_tasks += 1
if not self._next_render:
self._next_render = datetime.datetime.now() + _RENDER_INTERVAL
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 7acb062d0..23abbe46d 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -113,7 +113,6 @@ class Scheduler:
self._interrupt_callback = interrupt_callback
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
- self._state.register_task_retry_callback(self._failure_retry)
# Ensure that the forkserver is started before we start.
# This is best run before we do any GRPC connections to casd or have
@@ -517,20 +516,6 @@ class Scheduler:
self._ticker_callback()
self.loop.call_later(1, self._tick)
- def _failure_retry(self, task_id, unique_id):
- task = self._state.tasks[task_id]
-
- queue = None
- for q in self.queues:
- if q.action_name == task.action_name:
- queue = q
- break
- # Assert queue found, we should only be retrying a queued job
- assert queue
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
-
def _handle_exception(self, loop, context: dict) -> None:
e = context.get("exception")
exc = bool(e)
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 4a552b0b9..ddcbb09b9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -16,7 +16,8 @@
#
import datetime
-from collections import OrderedDict
+from typing import Optional, Tuple, List, Dict, Callable
+from .types import _DisplayKey
# TaskGroup
@@ -24,23 +25,27 @@ from collections import OrderedDict
# The state data stored for a group of tasks (usually scheduler queues)
#
# Args:
-# name (str): The name of the Task Group, e.g. 'build'
-# state (State): The state object
-# complete_name (str): Optional name for frontend status rendering, e.g. 'built'
+# name: The name of the Task Group, e.g. 'build'
+# state: The state object
+# complete_name: Optional name for frontend status rendering, e.g. 'built'
#
class TaskGroup:
- def __init__(self, name, state, complete_name=None):
- self.name = name
- self.complete_name = complete_name
- self.processed_tasks = 0
- self.skipped_tasks = 0
- # NOTE: failed_tasks is a list of strings instead of an integer count
- # because the frontend requires the full list of failed tasks to
- # know whether to print failure messages for a given element.
- self.failed_tasks = []
-
- self._state = state
- self._update_task_group_cbs = []
+ def __init__(self, name: str, state: "State", complete_name: Optional[str] = None) -> None:
+
+ #
+ # Public members
+ #
+ self.name: str = name # The name of tasks in this group
+ self.complete_name: Optional[str] = complete_name # Optional name for frontend status rendering, e.g. 'built'
+
+ self.processed_tasks: int = 0 # Number of processed tasks
+ self.skipped_tasks: int = 0 # Number of skipped tasks
+ self.failed_tasks: List[str] = [] # List of element full names which failed
+
+ #
+ # Private members
+ #
+ self._state: "State" = state
###########################################
# Core-facing APIs to drive notifications #
@@ -52,7 +57,7 @@ class TaskGroup:
#
# This is a core-facing API and should not be called from the frontend
#
- def add_processed_task(self):
+ def add_processed_task(self) -> None:
self.processed_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
@@ -63,7 +68,7 @@ class TaskGroup:
#
# This is a core-facing API and should not be called from the frontend
#
- def add_skipped_task(self):
+ def add_skipped_task(self) -> None:
self.skipped_tasks += 1
for cb in self._state._task_groups_changed_cbs:
@@ -74,19 +79,120 @@ class TaskGroup:
# Update the TaskGroup's list of failed tasks and notify of changes
#
# Args:
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # full_name: The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
#
# This is a core-facing API and should not be called from the frontend
#
- def add_failed_task(self, full_name):
+ def add_failed_task(self, full_name: str) -> None:
self.failed_tasks.append(full_name)
for cb in self._state._task_groups_changed_cbs:
cb()
+# Task
+#
+# The state data stored for an individual task
+#
+# Args:
+# state: The State object
+# task_id: The unique identifier of the task
+# action_name: The name of the action, e.g. 'build'
+# full_name: The full name of the task, distinguishing
+# it from other tasks with the same action name
+# e.g. an element's name.
+# elapsed_offset: The time the task started, relative to
+# buildstream's start time.
+class Task:
+ def __init__(
+ self, state: "State", task_id: str, action_name: str, full_name: str, elapsed_offset: datetime.timedelta
+ ) -> None:
+
+ #
+ # Public members
+ #
+ self.id: str = task_id
+ self.action_name: str = action_name
+ self.full_name: str = full_name
+ self.elapsed_offset: datetime.timedelta = elapsed_offset
+ self.current_progress: Optional[int] = None
+ self.maximum_progress: Optional[int] = None
+
+ #
+ # Private members
+ #
+ self._state: "State" = state
+ self._task_changed_cb: Optional[Callable[[], None]] = None # Callback to call when something could be rendered
+
+ ##############################################
+ # Core-facing APIs for driving notifications #
+ ##############################################
+
+ # set_task_changed_callback()
+ #
+ # Sets the callback to be called when this task has
+ # changed.
+ #
+ # This is just a convenience codepath for the Messenger object
+ # run simple tasks outside of the scheduler context, rather
+ # than connecting to the State callbacks which are there for the
+ # purpose of the frontend to get notifications of task progress.
+ #
+ # Args:
+ # callback: The callback to call when progress changed
+ #
+ def set_task_changed_callback(self, callback: Optional[Callable[[], None]]) -> None:
+ self._task_changed_cb = callback
+
+ # set_maximum_progress()
+ #
+ # Sets the maximum progress possible for this task.
+ #
+ # Args:
+ # progress: The maximum progress possible for this task
+ #
+ def set_maximum_progress(self, progress: int) -> None:
+ self.maximum_progress = progress
+ self._notify_task_changed()
+
+ # set_current_progress()
+ #
+ # Sets the current progress of the task, this should
+ # be a number between 0 and the maximum progress, if a
+ # maximum progress has been set.
+ #
+ # Args:
+ # progress: The current progress
+ #
+ def set_current_progress(self, progress: int) -> None:
+ self.current_progress = progress
+ self._notify_task_changed()
+
+ # add_current_progress()
+ #
+ # A convenience function for incrementing the current
+ # progress of this task by 1.
+ #
+ def add_current_progress(self) -> None:
+ if self.current_progress is None:
+ new_progress = 1
+ else:
+ new_progress = self.current_progress + 1
+ self.set_current_progress(new_progress)
+
+ ##############################################
+ # Private methods #
+ ##############################################
+ def _notify_task_changed(self) -> None:
+ for cb in self._state._task_changed_cbs:
+ cb(self.id)
+
+ if self._task_changed_cb:
+ self._task_changed_cb()
+
+
# State
#
# The state data that is stored for the purpose of sharing with the frontend.
@@ -96,23 +202,26 @@ class TaskGroup:
# when parts of State change, and read State to know what has changed.
#
# Args:
-# session_start (datetime): The time the session started
+# session_start: The time the session started
#
class State:
- def __init__(self, session_start):
- self._session_start = session_start
-
- self.task_groups = OrderedDict() # key is TaskGroup name
-
- # Note: A Task's full_name is technically unique, but only accidentally.
- self.tasks = OrderedDict() # key is a tuple of action_name and full_name
-
- self._task_added_cbs = []
- self._task_removed_cbs = []
- self._task_changed_cbs = []
- self._task_groups_changed_cbs = []
- self._task_failed_cbs = []
- self._task_retry_cbs = []
+ def __init__(self, session_start: datetime.datetime) -> None:
+
+ #
+ # Public members
+ #
+ self.task_groups: Dict[str, TaskGroup] = {} # Dictionary of active task groups by group name
+ self.tasks: Dict[str, Task] = {} # Dictionary of active tasks by unique task ID
+
+ #
+ # Private members
+ #
+ self._session_start: datetime.datetime = session_start
+ self._task_added_cbs: List[Callable[[str], None]] = []
+ self._task_removed_cbs: List[Callable[[str], None]] = []
+ self._task_changed_cbs: List[Callable[[str], None]] = []
+ self._task_failed_cbs: List[Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]] = []
+ self._task_groups_changed_cbs: List[Callable[[], None]] = []
#####################################
# Frontend-facing notification APIs #
@@ -123,12 +232,12 @@ class State:
# Registers a callback to be notified when a task is added
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_added_callback(self, callback):
+ def register_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.append(callback)
# unregister_task_added_callback()
@@ -137,9 +246,9 @@ class State:
# register_task_added_callback()
#
# Args:
- # callback (function): The callback to be removed
+ # callback: The callback to be removed
#
- def unregister_task_added_callback(self, callback):
+ def unregister_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.remove(callback)
# register_task_removed_callback()
@@ -147,12 +256,12 @@ class State:
# Registers a callback to be notified when a task is removed
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_removed_callback(self, callback):
+ def register_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.append(callback)
# unregister_task_removed_callback()
@@ -161,9 +270,9 @@ class State:
# register_task_removed_callback()
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
- def unregister_task_removed_callback(self, callback):
+ def unregister_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.remove(callback)
# register_task_changed_callback()
@@ -171,12 +280,12 @@ class State:
# Register a callback to be notified when a task has changed
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_changed_callback(self, callback):
+ def register_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.append(callback)
# unregister_task_changed_callback()
@@ -185,9 +294,9 @@ class State:
# register_task_changed_callback()
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
- def unregister_task_changed_callback(self, callback):
+ def unregister_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.remove(callback)
# register_task_failed_callback()
@@ -198,11 +307,12 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
- # element (tuple): (optionally) The element unique_id and display keys if an
- # element job
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and DisplayKey of an element job
#
- def register_task_failed_callback(self, callback):
+ def register_task_failed_callback(
+ self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
+ ) -> None:
self._task_failed_cbs.append(callback)
# unregister_task_failed_callback()
@@ -213,22 +323,34 @@ class State:
# Args:
# callback (function): The callback to be removed
#
- def unregister_task_failed_callback(self, callback):
+ def unregister_task_failed_callback(
+ self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
+ ) -> None:
self._task_failed_cbs.remove(callback)
- # register_task_retry_callback()
+ # register_task_groups_changed_callback()
#
- # Registers a callback to be notified when a task is to be retried
+ # Registers a callback to be notified whenever the task groups info has changed
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and DisplayKey of an element job
+ #
+ def register_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
+ self._task_groups_changed_cbs.append(callback)
+
+ # unregister_task_groups_changed_callback()
+ #
+ # Unregisters a callback previously registered by register_task_groups_changed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
#
- def register_task_retry_callback(self, callback):
- self._task_retry_cbs.append(callback)
+ def unregister_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
+ self._task_groups_changed_cbs.remove(callback)
##############################################
# Core-facing APIs for driving notifications #
@@ -247,7 +369,7 @@ class State:
# Returns:
# TaskGroup: The task group created
#
- def add_task_group(self, name, complete_name=None):
+ def add_task_group(self, name, complete_name=None) -> TaskGroup:
assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
group = TaskGroup(name, self, complete_name)
self.task_groups[name] = group
@@ -263,7 +385,7 @@ class State:
# Args:
# name (str): The name of the task group, e.g. 'build'
#
- def remove_task_group(self, name):
+ def remove_task_group(self, name) -> None:
# Rely on 'del' to raise an error when removing nonexistent task groups
del self.task_groups[name]
@@ -274,9 +396,9 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
+ # task_id: The unique identifier of the task
+ # action_name: The name of the action, e.g. 'build'
+ # full_name: The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
@@ -284,7 +406,12 @@ class State:
# use this as they don't report relative to wallclock time
# if the Scheduler has been suspended.
#
- def add_task(self, task_id, action_name, full_name, elapsed_offset=None):
+ # Returns:
+ # The new task
+ #
+ def add_task(
+ self, task_id: str, action_name: str, full_name: str, elapsed_offset: Optional[datetime.timedelta] = None
+ ) -> Task:
assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format(
action_name, full_name, task_id, self.tasks
)
@@ -292,7 +419,7 @@ class State:
if not elapsed_offset:
elapsed_offset = self.elapsed_time()
- task = _Task(self, task_id, action_name, full_name, elapsed_offset)
+ task = Task(self, task_id, action_name, full_name, elapsed_offset)
self.tasks[task_id] = task
for cb in self._task_added_cbs:
@@ -307,9 +434,9 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def remove_task(self, task_id):
+ def remove_task(self, task_id: str) -> None:
# Rely on 'del' to raise an error when removing nonexistent tasks
del self.tasks[task_id]
@@ -326,41 +453,27 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
- # element (tuple): (optionally) The element unique_id and display keys if an
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and display keys if an
# element job
#
- def fail_task(self, task_id, element=None):
+ def fail_task(self, task_id: str, element: Optional[Tuple[int, _DisplayKey]] = None) -> None:
for cb in self._task_failed_cbs:
cb(task_id, element)
- # retry_task()
- #
- # Notify all registered callbacks that a task is to be retried.
- #
- # This is a core-facing API and should not be called from the frontend
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
- #
- def retry_task(self, task_id: str, unique_id: str) -> None:
- for cb in self._task_retry_cbs:
- cb(task_id, unique_id)
-
# elapsed_time()
#
# Fetches the current session elapsed time
#
# Args:
- # start_time(time): Optional explicit start time, relative to caller.
+ # start_time: Optional explicit start time, relative to caller.
#
# Returns:
- # (timedelta): The amount of time since the start of the session,
- # discounting any time spent while jobs were suspended if
- # start_time given relative to the Scheduler
+ # The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended if
+ # start_time given relative to the Scheduler
#
- def elapsed_time(self, start_time=None):
+ def elapsed_time(self, start_time: Optional[datetime.datetime] = None) -> datetime.timedelta:
time_now = datetime.datetime.now()
if start_time is None:
start_time = self._session_start or time_now
@@ -379,61 +492,3 @@ class State:
#
def offset_start_time(self, offset: datetime.timedelta) -> None:
self._session_start += offset
-
-
-# _Task
-#
-# The state data stored for an individual task
-#
-# Args:
-# state (State): The State object
-# task_id (str): The unique identifier of the task
-# action_name (str): The name of the action, e.g. 'build'
-# full_name (str): The full name of the task, distinguishing
-# it from other tasks with the same action name
-# e.g. an element's name.
-# elapsed_offset (timedelta): The time the task started, relative to
-# buildstream's start time.
-class _Task:
- def __init__(self, state, task_id, action_name, full_name, elapsed_offset):
- self._state = state
- self.id = task_id
- self.action_name = action_name
- self.full_name = full_name
- self.elapsed_offset = elapsed_offset
- self.current_progress = None
- self.maximum_progress = None
-
- self._render_cb = None # Callback to call when something could be rendered
-
- # set_render_cb()
- #
- # Sets the callback to be called when the Task has changed and should be rendered
- #
- # NOTE: This should probably be removed once the frontend is running
- # separately from the scheduler, since renders could be triggered
- # by the scheduler.
- def set_render_cb(self, callback):
- self._render_cb = callback
-
- def set_current_progress(self, progress):
- self.current_progress = progress
- for cb in self._state._task_changed_cbs:
- cb(self.id)
- if self._render_cb:
- self._render_cb()
-
- def set_maximum_progress(self, progress):
- self.maximum_progress = progress
- for cb in self._state._task_changed_cbs:
- cb(self.id)
-
- if self._render_cb:
- self._render_cb()
-
- def add_current_progress(self):
- if self.current_progress is None:
- new_progress = 1
- else:
- new_progress = self.current_progress + 1
- self.set_current_progress(new_progress)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a5391562a..b50be2a0d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1069,6 +1069,33 @@ class Stream:
self._suspended = False
self._scheduler.resume()
+ # retry_job()
+ #
+ # Retry the indicated job
+ #
+ # Args:
+ # action_name: The unique identifier of the task
+ # unique_id: A unique_id to load an Element instance
+ #
+ def retry_job(self, action_name: str, unique_id: str) -> None:
+ element = Plugin._lookup(unique_id)
+
+ #
+ # Update the state task group, remove the failed element
+ #
+ group = self._state.task_groups[action_name]
+ group.failed_tasks.remove(element._get_full_name())
+
+ #
+ # Find the queue for this action name and requeue the element
+ #
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ assert queue
+ queue.enqueue([element])
+
#############################################################
# Private Methods #
#############################################################
@@ -1346,19 +1373,6 @@ class Stream:
self.session_elements += plan
- # _failure_retry()
- #
- # Enqueues given element via unique_id to the specified queue
- # matched against provided action_name & removes the related
- # failed task from the tasks group.
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: A unique_id to load an Element instance
- #
- def _failure_retry(self, task_id: str, unique_id: str) -> None:
- self._state.retry_task(task_id, unique_id)
-
# _run()
#
# Common function for running the scheduler