diff options
author | Tristan van Berkom <tristan@codethink.co.uk> | 2020-12-10 15:25:12 +0900 |
---|---|---|
committer | Tristan van Berkom <tristan@codethink.co.uk> | 2020-12-10 18:12:37 +0900 |
commit | cbc8e78fed4caa6b0f2b6590d7d4025c46c0e500 (patch) | |
tree | 30f8f3064c8b186c3db932978d80e16eabb72b0e /src | |
parent | f9ac4815ebec826aa076bff5ffd45f7559a752c9 (diff) | |
download | buildstream-cbc8e78fed4caa6b0f2b6590d7d4025c46c0e500.tar.gz |
Refactor: Use explicit invocation for retrying jobs.
We created the State object in the core for the purpose of advertizing
state to the frontend, and the frontend can register callbacks and get
updates to state changes (implicit invocation in the frontend), state
always belongs to the core and the frontend can only read state.
When the frontend asks the core to do something, this should always
be done with an explicit function call, and preferably not via the
State object, as this confuses the use of state, which is only a readonly
state advertizing desk.
This was broken (implemented backwards) for job retries, instead we had
the frontend telling state "It has been requested that this job be retried !",
and then we had the core registering callbacks to that frontend request - this
direction of implicit invocation should not happen (the core should never
have to register callbacks on the State object at all in fact).
Summary of changes:
* _stream.py: Change _failure_retry(), which was for some reason
private albeit called from the frontend, to an explicit function
call named "retry_job()".
Instead of calling into the State object and causing core-side
callbacks to be triggered, later to be handled by the Scheduler,
implement the retry directly from the Stream, since this implementation
deals only with Queues and State, which already directly belong to
the Stream object, there is no reason to trouble the Scheduler
with this.
* _scheduler.py: Remove the callback handling the State "task retry"
event.
* _state.py: Remove the task retry callback chain completely.
* _frontend/app.py: Call stream.retry_job() instead of
stream.failure_retry(), now passing along the task's action name
rather than the task's ID.
This API now assumes that Stream.retry_job() can only be called on
a task which originates from a scheduler Queue, and expects to be
given the action name of the queue in which the given element has
failed and should be retried..
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/_frontend/app.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 15 | ||||
-rw-r--r-- | src/buildstream/_state.py | 29 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 40 |
4 files changed, 28 insertions, 58 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 2f4235733..993747041 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -732,7 +732,7 @@ class App: elif choice == "retry": click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - self.stream._failure_retry(task.id, unique_id) + self.stream.retry_job(task.action_name, unique_id) # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 7acb062d0..23abbe46d 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -113,7 +113,6 @@ class Scheduler: self._interrupt_callback = interrupt_callback self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) - self._state.register_task_retry_callback(self._failure_retry) # Ensure that the forkserver is started before we start. # This is best run before we do any GRPC connections to casd or have @@ -517,20 +516,6 @@ class Scheduler: self._ticker_callback() self.loop.call_later(1, self._tick) - def _failure_retry(self, task_id, unique_id): - task = self._state.tasks[task_id] - - queue = None - for q in self.queues: - if q.action_name == task.action_name: - queue = q - break - # Assert queue found, we should only be retrying a queued job - assert queue - element = Plugin._lookup(unique_id) - queue._task_group.failed_tasks.remove(element._get_full_name()) - queue.enqueue([element]) - def _handle_exception(self, loop, context: dict) -> None: e = context.get("exception") exc = bool(e) diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index 0233dd323..773aa2146 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -112,7 +112,6 @@ class State: self._task_changed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] - self._task_retry_cbs = [] ##################################### # Frontend-facing notification APIs # @@ -216,20 +215,6 @@ class State: def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) - # register_task_retry_callback() - # - # Registers a callback to be notified when a task is to be retried - # - # Args: - # callback (function): The callback to be notified - # - # Callback Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up - # - def register_task_retry_callback(self, callback): - self._task_retry_cbs.append(callback) - ############################################## # Core-facing APIs for driving notifications # ############################################## @@ -334,20 +319,6 @@ class State: for cb in self._task_failed_cbs: cb(task_id, element) - # retry_task() - # - # Notify all registered callbacks that a task is to be retried. - # - # This is a core-facing API and should not be called from the frontend - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: The unique id of the plugin instance to look up - # - def retry_task(self, task_id: str, unique_id: str) -> None: - for cb in self._task_retry_cbs: - cb(task_id, unique_id) - # elapsed_time() # # Fetches the current session elapsed time diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index a5391562a..b50be2a0d 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1069,6 +1069,33 @@ class Stream: self._suspended = False self._scheduler.resume() + # retry_job() + # + # Retry the indicated job + # + # Args: + # action_name: The unique identifier of the task + # unique_id: A unique_id to load an Element instance + # + def retry_job(self, action_name: str, unique_id: str) -> None: + element = Plugin._lookup(unique_id) + + # + # Update the state task group, remove the failed element + # + group = self._state.task_groups[action_name] + group.failed_tasks.remove(element._get_full_name()) + + # + # Find the queue for this action name and requeue the element + # + queue = None + for q in self.queues: + if q.action_name == action_name: + queue = q + assert queue + queue.enqueue([element]) + ############################################################# # Private Methods # ############################################################# @@ -1346,19 +1373,6 @@ class Stream: self.session_elements += plan - # _failure_retry() - # - # Enqueues given element via unique_id to the specified queue - # matched against provided action_name & removes the related - # failed task from the tasks group. - # - # Args: - # task_id (str): The unique identifier of the task - # unique_id: A unique_id to load an Element instance - # - def _failure_retry(self, task_id: str, unique_id: str) -> None: - self._state.retry_task(task_id, unique_id) - # _run() # # Common function for running the scheduler |