diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-14 15:02:00 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-15 12:25:47 +0100 |
commit | f807cf8c348c7daeb8101e079416374824ec9dfe (patch) | |
tree | 7de928fc89873917c0b18ee9194826354dfcc38e | |
parent | e52bc8e68601a17226c33821f979aef4c9368da9 (diff) | |
download | buildstream-tpollard/frontendelement.tar.gz |
_frontend/app.py: Don't determine queue for retrying in frontendtpollard/frontendelement
When retrying a failed job Stream as the queue owner should handle
the element enqueue and failed task deletion. After frontend process
separation it will also not be plausible to pickle the queue object.
-rw-r--r-- | src/buildstream/_frontend/app.py | 16 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 20 |
2 files changed, 22 insertions, 14 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 918ee0f74..236e8f80b 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -567,12 +567,6 @@ class App(): # terminate if not self.stream.terminated: if element: - # look-up queue - for q in self.stream.queues: - if q.action_name == action_name: - queue = q - assert queue, "Job action {} does not have a corresponding queue".format(action_name) - # Get the last failure message for additional context failure = self._fail_messages.get(full_name) @@ -584,14 +578,14 @@ class App(): "unable to retrieve failure message for element {}\n\n\n\n\n" .format(full_name), err=True) else: - self._handle_failure(element, queue, failure, full_name) + self._handle_failure(element, action_name, failure, full_name) else: # Not an element_job, we don't handle the failure click.echo("\nTerminating all jobs\n", err=True) self.stream.terminate() - def _handle_failure(self, element, queue, failure, full_name): + def _handle_failure(self, element, action_name, failure, full_name): # Handle non interactive mode setting of what to do when a job fails. if not self._interactive_failures: @@ -669,7 +663,11 @@ class App(): elif choice == 'retry': click.echo("\nRetrying failed job\n", err=True) unique_id = element[0] - self.stream._failure_retry(queue, unique_id) + try: + self.stream._failure_retry(action_name, unique_id) + except StreamError: + click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True) + self.stream.terminate() # # Print the session heading if we've loaded a pipeline and there diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 51bb1c7ab..44147fc76 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1289,15 +1289,25 @@ class Stream(): # _failure_retry() # - # Enqueues given element via unique_id to the specified queue and - # remove the related failed task from the related group - # + # Enqueues given element via unique_id to the specified queue + # matched against provided action_name & removes the related + # failed task from the tasks group. # # Args: - # queue (Queue): The target queue + # action_name (str): The name of the action being performed # unique_id (str): A unique_id to load an Element instance # - def _failure_retry(self, queue, unique_id): + # Raises: + # (StreamError): If the related queue cannot be found + # + def _failure_retry(self, action_name, unique_id): + queue = None + # Attempt to resolve the required queue + for queue in self.queues: + if queue.action_name == action_name: + queue = queue + if not queue: + raise StreamError() element = Plugin._lookup(unique_id) queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) |