summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan van Berkom <tristan@codethink.co.uk>2020-12-10 15:25:12 +0900
committerTristan van Berkom <tristan@codethink.co.uk>2020-12-10 18:12:37 +0900
commitcbc8e78fed4caa6b0f2b6590d7d4025c46c0e500 (patch)
tree30f8f3064c8b186c3db932978d80e16eabb72b0e
parentf9ac4815ebec826aa076bff5ffd45f7559a752c9 (diff)
downloadbuildstream-cbc8e78fed4caa6b0f2b6590d7d4025c46c0e500.tar.gz
Refactor: Use explicit invocation for retrying jobs.
We created the State object in the core for the purpose of advertizing state to the frontend, and the frontend can register callbacks and get updates to state changes (implicit invocation in the frontend), state always belongs to the core and the frontend can only read state. When the frontend asks the core to do something, this should always be done with an explicit function call, and preferably not via the State object, as this confuses the use of state, which is only a readonly state advertizing desk. This was broken (implemented backwards) for job retries, instead we had the frontend telling state "It has been requested that this job be retried !", and then we had the core registering callbacks to that frontend request - this direction of implicit invocation should not happen (the core should never have to register callbacks on the State object at all in fact). Summary of changes: * _stream.py: Change _failure_retry(), which was for some reason private albeit called from the frontend, to an explicit function call named "retry_job()". Instead of calling into the State object and causing core-side callbacks to be triggered, later to be handled by the Scheduler, implement the retry directly from the Stream, since this implementation deals only with Queues and State, which already directly belong to the Stream object, there is no reason to trouble the Scheduler with this. * _scheduler.py: Remove the callback handling the State "task retry" event. * _state.py: Remove the task retry callback chain completely. * _frontend/app.py: Call stream.retry_job() instead of stream.failure_retry(), now passing along the task's action name rather than the task's ID. This API now assumes that Stream.retry_job() can only be called on a task which originates from a scheduler Queue, and expects to be given the action name of the queue in which the given element has failed and should be retried..
-rw-r--r--src/buildstream/_frontend/app.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py15
-rw-r--r--src/buildstream/_state.py29
-rw-r--r--src/buildstream/_stream.py40
4 files changed, 28 insertions, 58 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 2f4235733..993747041 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -732,7 +732,7 @@ class App:
elif choice == "retry":
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- self.stream._failure_retry(task.id, unique_id)
+ self.stream.retry_job(task.action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 7acb062d0..23abbe46d 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -113,7 +113,6 @@ class Scheduler:
self._interrupt_callback = interrupt_callback
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
- self._state.register_task_retry_callback(self._failure_retry)
# Ensure that the forkserver is started before we start.
# This is best run before we do any GRPC connections to casd or have
@@ -517,20 +516,6 @@ class Scheduler:
self._ticker_callback()
self.loop.call_later(1, self._tick)
- def _failure_retry(self, task_id, unique_id):
- task = self._state.tasks[task_id]
-
- queue = None
- for q in self.queues:
- if q.action_name == task.action_name:
- queue = q
- break
- # Assert queue found, we should only be retrying a queued job
- assert queue
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
-
def _handle_exception(self, loop, context: dict) -> None:
e = context.get("exception")
exc = bool(e)
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 0233dd323..773aa2146 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -112,7 +112,6 @@ class State:
self._task_changed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
- self._task_retry_cbs = []
#####################################
# Frontend-facing notification APIs #
@@ -216,20 +215,6 @@ class State:
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
- # register_task_retry_callback()
- #
- # Registers a callback to be notified when a task is to be retried
- #
- # Args:
- # callback (function): The callback to be notified
- #
- # Callback Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
- #
- def register_task_retry_callback(self, callback):
- self._task_retry_cbs.append(callback)
-
##############################################
# Core-facing APIs for driving notifications #
##############################################
@@ -334,20 +319,6 @@ class State:
for cb in self._task_failed_cbs:
cb(task_id, element)
- # retry_task()
- #
- # Notify all registered callbacks that a task is to be retried.
- #
- # This is a core-facing API and should not be called from the frontend
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: The unique id of the plugin instance to look up
- #
- def retry_task(self, task_id: str, unique_id: str) -> None:
- for cb in self._task_retry_cbs:
- cb(task_id, unique_id)
-
# elapsed_time()
#
# Fetches the current session elapsed time
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a5391562a..b50be2a0d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1069,6 +1069,33 @@ class Stream:
self._suspended = False
self._scheduler.resume()
+ # retry_job()
+ #
+ # Retry the indicated job
+ #
+ # Args:
+ # action_name: The unique identifier of the task
+ # unique_id: A unique_id to load an Element instance
+ #
+ def retry_job(self, action_name: str, unique_id: str) -> None:
+ element = Plugin._lookup(unique_id)
+
+ #
+ # Update the state task group, remove the failed element
+ #
+ group = self._state.task_groups[action_name]
+ group.failed_tasks.remove(element._get_full_name())
+
+ #
+ # Find the queue for this action name and requeue the element
+ #
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ assert queue
+ queue.enqueue([element])
+
#############################################################
# Private Methods #
#############################################################
@@ -1346,19 +1373,6 @@ class Stream:
self.session_elements += plan
- # _failure_retry()
- #
- # Enqueues given element via unique_id to the specified queue
- # matched against provided action_name & removes the related
- # failed task from the tasks group.
- #
- # Args:
- # task_id (str): The unique identifier of the task
- # unique_id: A unique_id to load an Element instance
- #
- def _failure_retry(self, task_id: str, unique_id: str) -> None:
- self._state.retry_task(task_id, unique_id)
-
# _run()
#
# Common function for running the scheduler