summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-04 17:32:18 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-07-06 18:07:53 +0000
commit46556cb03d4f5ddfd36cada6c92c5030e45f3447 (patch)
tree1f5592f7d081521c550679ab95da1cead3537afa
parent472e3d165df3f9866c0665d4f3ffa19f864d33a4 (diff)
downloadbuildstream-46556cb03d4f5ddfd36cada6c92c5030e45f3447.tar.gz
scheduler.py: Pass all 'retry' operations through the state
Stop using 'Notifications' for retries, the state is the one handling the callbacks required for every change in status of elements
-rw-r--r--src/buildstream/_scheduler/scheduler.py4
-rw-r--r--src/buildstream/_state.py32
-rw-r--r--src/buildstream/_stream.py9
3 files changed, 37 insertions, 8 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 38d656623..1a4ac2e2c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -63,7 +63,6 @@ class NotificationType(FastEnum):
SUSPEND = "suspend"
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
- RETRY = "retry"
# Notification()
@@ -146,6 +145,7 @@ class Scheduler:
self._notifier = notifier
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
+ self._state.register_task_retry_callback(self._failure_retry)
# run()
#
@@ -583,8 +583,6 @@ class Scheduler:
self.jobs_suspended()
elif notification.notification_type == NotificationType.UNSUSPEND:
self.jobs_unsuspended()
- elif notification.notification_type == NotificationType.RETRY:
- self._failure_retry(notification.job_action, notification.element)
else:
# Do not raise exception once scheduler process is separated
# as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index b8479e2b9..6e08c004d 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -112,6 +112,7 @@ class State:
self._task_changed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
+ self._task_retry_cbs = []
#####################################
# Frontend-facing notification APIs #
@@ -226,6 +227,23 @@ class State:
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
+ # register_task_retry_callback()
+ #
+ # Registers a callback to be notified when a task is to be retried
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ # Callback Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ # element_job (bool): (optionally) If an element job failed.
+ #
+ def register_task_retry_callback(self, callback):
+ self._task_retry_cbs.append(callback)
+
##############################################
# Core-facing APIs for driving notifications #
##############################################
@@ -336,6 +354,20 @@ class State:
for cb in self._task_failed_cbs:
cb(action_name, full_name, element)
+ # retry_task()
+ #
+ # Notify all registered callbacks that a task is to be retried.
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # action_name: The name of the action, e.g. 'build'
+ # unique_id: The unique id of the plugin instance to look up
+ #
+ def retry_task(self, action_name: str, unique_id: str) -> None:
+ for cb in self._task_retry_cbs:
+ cb(action_name, unique_id)
+
# elapsed_time()
#
# Fetches the current session elapsed time
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index fb0b2f6e1..81c8a88ef 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1354,12 +1354,11 @@ class Stream:
# failed task from the tasks group.
#
# Args:
- # action_name (str): The name of the action being performed
- # unique_id (str): A unique_id to load an Element instance
+ # action_name: The name of the action being performed
+ # unique_id: A unique_id to load an Element instance
#
- def _failure_retry(self, action_name, unique_id):
- notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
- self._notify(notification)
+ def _failure_retry(self, action_name: str, unique_id: str) -> None:
+ self._state.retry_task(action_name, unique_id)
# _run()
#