summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-14 15:02:00 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-08-15 12:25:47 +0100
commitf807cf8c348c7daeb8101e079416374824ec9dfe (patch)
tree7de928fc89873917c0b18ee9194826354dfcc38e
parente52bc8e68601a17226c33821f979aef4c9368da9 (diff)
downloadbuildstream-tpollard/frontendelement.tar.gz
_frontend/app.py: Don't determine queue for retrying in frontendtpollard/frontendelement
When retrying a failed job Stream as the queue owner should handle the element enqueue and failed task deletion. After frontend process separation it will also not be plausible to pickle the queue object.
-rw-r--r--src/buildstream/_frontend/app.py16
-rw-r--r--src/buildstream/_stream.py20
2 files changed, 22 insertions, 14 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 918ee0f74..236e8f80b 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -567,12 +567,6 @@ class App():
# terminate
if not self.stream.terminated:
if element:
- # look-up queue
- for q in self.stream.queues:
- if q.action_name == action_name:
- queue = q
- assert queue, "Job action {} does not have a corresponding queue".format(action_name)
-
# Get the last failure message for additional context
failure = self._fail_messages.get(full_name)
@@ -584,14 +578,14 @@ class App():
"unable to retrieve failure message for element {}\n\n\n\n\n"
.format(full_name), err=True)
else:
- self._handle_failure(element, queue, failure, full_name)
+ self._handle_failure(element, action_name, failure, full_name)
else:
# Not an element_job, we don't handle the failure
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
- def _handle_failure(self, element, queue, failure, full_name):
+ def _handle_failure(self, element, action_name, failure, full_name):
# Handle non interactive mode setting of what to do when a job fails.
if not self._interactive_failures:
@@ -669,7 +663,11 @@ class App():
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- self.stream._failure_retry(queue, unique_id)
+ try:
+ self.stream._failure_retry(action_name, unique_id)
+ except StreamError:
+ click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True)
+ self.stream.terminate()
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 51bb1c7ab..44147fc76 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1289,15 +1289,25 @@ class Stream():
# _failure_retry()
#
- # Enqueues given element via unique_id to the specified queue and
- # remove the related failed task from the related group
- #
+ # Enqueues given element via unique_id to the specified queue
+ # matched against provided action_name & removes the related
+ # failed task from the tasks group.
#
# Args:
- # queue (Queue): The target queue
+ # action_name (str): The name of the action being performed
# unique_id (str): A unique_id to load an Element instance
#
- def _failure_retry(self, queue, unique_id):
+ # Raises:
+ # (StreamError): If the related queue cannot be found
+ #
+ def _failure_retry(self, action_name, unique_id):
+ queue = None
+ # Attempt to resolve the required queue
+ for queue in self.queues:
+ if queue.action_name == action_name:
+ queue = queue
+ if not queue:
+ raise StreamError()
element = Plugin._lookup(unique_id)
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])