diff options
Diffstat (limited to 'src/buildstream/_frontend/app.py')
-rw-r--r-- | src/buildstream/_frontend/app.py | 65 |
1 files changed, 42 insertions, 23 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index 9550fea40..cf4ac2b8d 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -31,6 +31,7 @@ from .. import Scope # Import various buildstream internals from .._context import Context +from ..plugin import Plugin from .._platform import Platform from .._project import Project from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError @@ -38,7 +39,6 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml -from .._scheduler import ElementJob, JobStatus # Import frontend assets from .profile import Profile @@ -82,6 +82,7 @@ class App(): self._interactive_failures = None # Whether to handle failures interactively self._started = False # Whether a session has started self._set_project_dir = False # Whether -C option was used + self._state = None # Frontend reads this and registers callbacks # UI Colors Profiles self._content_profile = Profile(fg='yellow') @@ -203,8 +204,19 @@ class App(): except BstError as e: self._error_exit(e, "Error instantiating platform") + # Create the stream right away, we'll need to pass it around. + self.stream = Stream(self.context, self._session_start, + session_start_callback=self.session_start_cb, + interrupt_callback=self._interrupt_handler, + ticker_callback=self._tick) + + self._state = self.stream.get_state() + + # Register callbacks with the State + self._state.register_task_failed_callback(self._job_failed) + # Create the logger right before setting the message handler - self.logger = LogLine(self.context, + self.logger = LogLine(self.context, self._state, self._content_profile, self._format_profile, self._success_profile, @@ -226,16 +238,11 @@ class App(): # we can override the global exception hook. sys.excepthook = self._global_exception_handler - # Create the stream right away, we'll need to pass it around - self.stream = Stream(self.context, self._session_start, - session_start_callback=self.session_start_cb, - interrupt_callback=self._interrupt_handler, - ticker_callback=self._tick, - job_start_callback=self._job_started, - job_complete_callback=self._job_completed) + # Initialize the parts of Stream that have side-effects + self.stream.init() # Create our status printer, only available in interactive - self._status = Status(self.context, + self._status = Status(self.context, self._state, self._content_profile, self._format_profile, self._success_profile, self._error_profile, self.stream, colors=self.colors) @@ -536,21 +543,31 @@ class App(): def _tick(self): self._maybe_render_status() - def _job_started(self, job): - self._status.add_job(job) - self._maybe_render_status() - - def _job_completed(self, job, status): - self._status.remove_job(job) - self._maybe_render_status() - + # Callback that a job has failed + # + # XXX: This accesses the core directly, which is discouraged. + # Removing use of the core would require delegating to Shell + # the creation of an interactive shell, and the retrying of jobs. + # + # Args: + # action_name (str): The name of the action being performed, + # same as the task group, if it exists + # full_name (str): The name of this specific task, e.g. the element name + # unique_id (int): If an element job failed, the unique ID of the element. + # + def _job_failed(self, action_name, full_name, unique_id=None): # Dont attempt to handle a failure if the user has already opted to # terminate - if status is JobStatus.FAIL and not self.stream.terminated: + if not self.stream.terminated: + if unique_id: + # look-up queue + for q in self.stream.queues: + if q.action_name == action_name: + queue = q + assert queue, "Job action {} does not have a corresponding queue".format(action_name) - if isinstance(job, ElementJob): - element = job.element - queue = job.queue + # look-up element + element = Plugin._lookup(unique_id) # Get the last failure message for additional context failure = self._fail_messages.get(element._unique_id) @@ -564,6 +581,7 @@ class App(): .format(element), err=True) else: self._handle_failure(element, queue, failure) + else: click.echo("\nTerminating all jobs\n", err=True) self.stream.terminate() @@ -643,7 +661,8 @@ class App(): click.echo("\nContinuing with other non failing elements\n", err=True) elif choice == 'retry': click.echo("\nRetrying failed job\n", err=True) - queue.failed_elements.remove(element) + # FIXME: Outstandingly nasty modification of core state + queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) # |