summaryrefslogtreecommitdiff
path: root/src/buildstream/_frontend/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_frontend/app.py')
-rw-r--r--src/buildstream/_frontend/app.py65
1 files changed, 42 insertions, 23 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 9550fea40..cf4ac2b8d 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -31,6 +31,7 @@ from .. import Scope
# Import various buildstream internals
from .._context import Context
+from ..plugin import Plugin
from .._platform import Platform
from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
@@ -38,7 +39,6 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
-from .._scheduler import ElementJob, JobStatus
# Import frontend assets
from .profile import Profile
@@ -82,6 +82,7 @@ class App():
self._interactive_failures = None # Whether to handle failures interactively
self._started = False # Whether a session has started
self._set_project_dir = False # Whether -C option was used
+ self._state = None # Frontend reads this and registers callbacks
# UI Colors Profiles
self._content_profile = Profile(fg='yellow')
@@ -203,8 +204,19 @@ class App():
except BstError as e:
self._error_exit(e, "Error instantiating platform")
+ # Create the stream right away, we'll need to pass it around.
+ self.stream = Stream(self.context, self._session_start,
+ session_start_callback=self.session_start_cb,
+ interrupt_callback=self._interrupt_handler,
+ ticker_callback=self._tick)
+
+ self._state = self.stream.get_state()
+
+ # Register callbacks with the State
+ self._state.register_task_failed_callback(self._job_failed)
+
# Create the logger right before setting the message handler
- self.logger = LogLine(self.context,
+ self.logger = LogLine(self.context, self._state,
self._content_profile,
self._format_profile,
self._success_profile,
@@ -226,16 +238,11 @@ class App():
# we can override the global exception hook.
sys.excepthook = self._global_exception_handler
- # Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context, self._session_start,
- session_start_callback=self.session_start_cb,
- interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick,
- job_start_callback=self._job_started,
- job_complete_callback=self._job_completed)
+ # Initialize the parts of Stream that have side-effects
+ self.stream.init()
# Create our status printer, only available in interactive
- self._status = Status(self.context,
+ self._status = Status(self.context, self._state,
self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
self.stream, colors=self.colors)
@@ -536,21 +543,31 @@ class App():
def _tick(self):
self._maybe_render_status()
- def _job_started(self, job):
- self._status.add_job(job)
- self._maybe_render_status()
-
- def _job_completed(self, job, status):
- self._status.remove_job(job)
- self._maybe_render_status()
-
+ # Callback that a job has failed
+ #
+ # XXX: This accesses the core directly, which is discouraged.
+ # Removing use of the core would require delegating to Shell
+ # the creation of an interactive shell, and the retrying of jobs.
+ #
+ # Args:
+ # action_name (str): The name of the action being performed,
+ # same as the task group, if it exists
+ # full_name (str): The name of this specific task, e.g. the element name
+ # unique_id (int): If an element job failed, the unique ID of the element.
+ #
+ def _job_failed(self, action_name, full_name, unique_id=None):
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if status is JobStatus.FAIL and not self.stream.terminated:
+ if not self.stream.terminated:
+ if unique_id:
+ # look-up queue
+ for q in self.stream.queues:
+ if q.action_name == action_name:
+ queue = q
+ assert queue, "Job action {} does not have a corresponding queue".format(action_name)
- if isinstance(job, ElementJob):
- element = job.element
- queue = job.queue
+ # look-up element
+ element = Plugin._lookup(unique_id)
# Get the last failure message for additional context
failure = self._fail_messages.get(element._unique_id)
@@ -564,6 +581,7 @@ class App():
.format(element), err=True)
else:
self._handle_failure(element, queue, failure)
+
else:
click.echo("\nTerminating all jobs\n", err=True)
self.stream.terminate()
@@ -643,7 +661,8 @@ class App():
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
- queue.failed_elements.remove(element)
+ # FIXME: Outstandingly nasty modification of core state
+ queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
#