summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-28 17:15:44 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:44:53 +0000
commitaaa104bc2e95e973adb6b61276e38ed573bd97d2 (patch)
tree8497a98c66b65aa62f7daec14ad23f0567443910
parent1e1e7baeb7adf72ee1a206c58014d7b7eb753f1c (diff)
downloadbuildstream-aaa104bc2e95e973adb6b61276e38ed573bd97d2.tar.gz
scheduler.py: Notification for interactive failure retry
Add a notifcation for RETRY. This moves the retry handling into scheduler, which will be running in the process which has been suspended for interactivity and as such will be able to load the relevant Element. Note a failed job via the scheduler should never not have a related queue, so the try except when matching the queue via the action name should not be needed.
-rw-r--r--src/buildstream/_frontend/app.py6
-rw-r--r--src/buildstream/_scheduler/scheduler.py16
-rw-r--r--src/buildstream/_stream.py17
3 files changed, 21 insertions, 18 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index f9729a7ce..45160afbc 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -663,11 +663,7 @@ class App():
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- try:
- self.stream._failure_retry(action_name, unique_id)
- except StreamError:
- click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True)
- self.stream.terminate()
+ self.stream._failure_retry(action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3c491b92d..b892296f5 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -30,6 +30,7 @@ from .resources import Resources
from .jobs import JobStatus
from ..types import FastEnum
from .._profile import Topics, PROFILER
+from ..plugin import Plugin
# A decent return code for Scheduler.run()
@@ -59,6 +60,7 @@ class NotificationType(FastEnum):
SUSPEND = "suspend"
UNSUSPEND = "unsuspend"
SUSPENDED = "suspended"
+ RETRY = "retry"
# Notification()
@@ -497,6 +499,18 @@ class Scheduler():
self._notify(Notification(NotificationType.TICK))
self.loop.call_later(1, self._tick)
+ def _failure_retry(self, action_name, unique_id):
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ break
+ # Assert queue found, we should only be retrying a queued job
+ assert queue
+ element = Plugin._lookup(unique_id)
+ queue._task_group.failed_tasks.remove(element._get_full_name())
+ queue.enqueue([element])
+
def _notify(self, notification):
# Scheduler to Stream notifcations on right side
self._notification_queue.append(notification)
@@ -512,6 +526,8 @@ class Scheduler():
self.jobs_suspended()
elif notification.notification_type == NotificationType.UNSUSPEND:
self.jobs_unsuspended()
+ elif notification.notification_type == NotificationType.RETRY:
+ self._failure_retry(notification.job_action, notification.element)
else:
# Do not raise exception once scheduler process is separated
# as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e3d5eee89..6d8d918dd 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1338,20 +1338,11 @@ class Stream():
# action_name (str): The name of the action being performed
# unique_id (str): A unique_id to load an Element instance
#
- # Raises:
- # (StreamError): If the related queue cannot be found
- #
def _failure_retry(self, action_name, unique_id):
- queue = None
- # Attempt to resolve the required queue
- for queue in self.queues:
- if queue.action_name == action_name:
- queue = queue
- if not queue:
- raise StreamError()
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ notification = Notification(NotificationType.RETRY,
+ job_action=action_name,
+ element=unique_id)
+ self._notify(notification)
# _run()
#