From 46556cb03d4f5ddfd36cada6c92c5030e45f3447 Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Sat, 4 Jul 2020 17:32:18 +0000 Subject: scheduler.py: Pass all 'retry' operations through the state Stop using 'Notifications' for retries, the state is the one handling the callbacks required for every change in status of elements --- src/buildstream/_scheduler/scheduler.py | 4 +--- src/buildstream/_state.py | 32 ++++++++++++++++++++++++++++++++ src/buildstream/_stream.py | 9 ++++----- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 38d656623..1a4ac2e2c 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -63,7 +63,6 @@ class NotificationType(FastEnum): SUSPEND = "suspend" UNSUSPEND = "unsuspend" SUSPENDED = "suspended" - RETRY = "retry" # Notification() @@ -146,6 +145,7 @@ class Scheduler: self._notifier = notifier self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) + self._state.register_task_retry_callback(self._failure_retry) # run() # @@ -583,8 +583,6 @@ class Scheduler: self.jobs_suspended() elif notification.notification_type == NotificationType.UNSUSPEND: self.jobs_unsuspended() - elif notification.notification_type == NotificationType.RETRY: - self._failure_retry(notification.job_action, notification.element) else: # Do not raise exception once scheduler process is separated # as we don't want to pickle exceptions between processes diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index b8479e2b9..6e08c004d 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -112,6 +112,7 @@ class State: self._task_changed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] + self._task_retry_cbs = [] ##################################### # Frontend-facing notification APIs # @@ -226,6 +227,23 @@ class State: def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) + # register_task_retry_callback() + # + # Registers a callback to be notified when a task is to be retried + # + # Args: + # callback (function): The callback to be notified + # + # Callback Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # element_job (bool): (optionally) If an element job failed. + # + def register_task_retry_callback(self, callback): + self._task_retry_cbs.append(callback) + ############################################## # Core-facing APIs for driving notifications # ############################################## @@ -336,6 +354,20 @@ class State: for cb in self._task_failed_cbs: cb(action_name, full_name, element) + # retry_task() + # + # Notify all registered callbacks that a task is to be retried. + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # action_name: The name of the action, e.g. 'build' + # unique_id: The unique id of the plugin instance to look up + # + def retry_task(self, action_name: str, unique_id: str) -> None: + for cb in self._task_retry_cbs: + cb(action_name, unique_id) + # elapsed_time() # # Fetches the current session elapsed time diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index fb0b2f6e1..81c8a88ef 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1354,12 +1354,11 @@ class Stream: # failed task from the tasks group. # # Args: - # action_name (str): The name of the action being performed - # unique_id (str): A unique_id to load an Element instance + # action_name: The name of the action being performed + # unique_id: A unique_id to load an Element instance # - def _failure_retry(self, action_name, unique_id): - notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) - self._notify(notification) + def _failure_retry(self, action_name: str, unique_id: str) -> None: + self._state.retry_task(action_name, unique_id) # _run() # -- cgit v1.2.1