summaryrefslogtreecommitdiff
path: root/buildstream/_frontend/app.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_frontend/app.py')
-rw-r--r--buildstream/_frontend/app.py70
1 files changed, 33 insertions, 37 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 58a8eab89..de23c12b0 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,7 +39,6 @@ from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
-from .._scheduler import Scheduler
from .._versions import BST_FORMAT_VERSION
from .. import __version__ as build_stream_version
from .. import _yaml
@@ -69,7 +68,6 @@ class App():
self.context = None # The Context object
self.stream = None # The Stream object
self.project = None # The toplevel Project object
- self.scheduler = None # The Scheduler
self.logger = None # The LogLine object
self.interactive = None # Whether we are running in interactive mode
self.colors = None # Whether to use colors in logging
@@ -83,6 +81,7 @@ class App():
self._status = None # The Status object
self._fail_messages = {} # Failure messages by unique plugin id
self._interactive_failures = None # Whether to handle failures interactively
+ self._started = False # Whether a session has started
# UI Colors Profiles
self._content_profile = Profile(fg='yellow')
@@ -202,14 +201,12 @@ class App():
self._error_exit(e, "Error loading project")
# Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context, self.project, self.loaded_cb)
-
- # Create the application's scheduler
- self.scheduler = Scheduler(self.context, self._session_start,
- interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick,
- job_start_callback=self._job_started,
- job_complete_callback=self._job_completed)
+ self.stream = Stream(self.context, self.project, self._session_start,
+ session_start_callback=self.session_start_cb,
+ interrupt_callback=self._interrupt_handler,
+ ticker_callback=self._tick,
+ job_start_callback=self._job_started,
+ job_complete_callback=self._job_completed)
# Create the logger right before setting the message handler
self.logger = LogLine(self.context,
@@ -224,8 +221,7 @@ class App():
self._status = Status(self.context,
self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.stream, self.scheduler,
- colors=self.colors)
+ self.stream, colors=self.colors)
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
@@ -238,10 +234,6 @@ class App():
if session_name:
self._message(MessageType.START, session_name)
- # XXX This is going to change soon !
- #
- self.stream._scheduler = self.scheduler
-
# Run the body of the session here, once everything is loaded
try:
yield
@@ -249,14 +241,15 @@ class App():
# Print a nice summary if this is a session
if session_name:
- elapsed = self.scheduler.elapsed_time()
+ elapsed = self.stream.elapsed_time
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
else:
self._message(MessageType.FAIL, session_name, elapsed=elapsed)
- self._print_summary()
+ if self._started:
+ self._print_summary()
# Exit with the error
self._error_exit(e)
@@ -264,8 +257,9 @@ class App():
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.scheduler.elapsed_time())
- self._print_summary()
+ self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ if self._started:
+ self._print_summary()
# init_project()
#
@@ -400,8 +394,8 @@ class App():
# If the scheduler has started, try to terminate all jobs gracefully,
# otherwise exit immediately.
- if self.scheduler.loop:
- self.scheduler.terminate_jobs()
+ if self.stream.running:
+ self.stream.terminate()
else:
sys.exit(-1)
@@ -411,8 +405,8 @@ class App():
def _maybe_render_status(self):
# If we're suspended or terminating, then dont render the status area
- if self._status and self.scheduler and \
- not (self.scheduler.suspended or self.scheduler.terminated):
+ if self._status and self.stream and \
+ not (self.stream.suspended or self.stream.terminated):
self._status.render()
#
@@ -423,7 +417,7 @@ class App():
# Only handle ^C interactively in interactive mode
if not self.interactive:
self._status.clear()
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
return
# Here we can give the user some choices, like whether they would
@@ -452,11 +446,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs at user request\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing\n", err=True)
@@ -473,7 +467,7 @@ class App():
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if not success and not self.scheduler.terminated:
+ if not success and not self.stream.terminated:
# Get the last failure message for additional context
failure = self._fail_messages.get(element._get_unique_id())
@@ -494,9 +488,9 @@ class App():
if not self._interactive_failures:
if self.context.sched_error_action == 'terminate':
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
elif self.context.sched_error_action == 'quit':
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif self.context.sched_error_action == 'continue':
pass
return
@@ -551,11 +545,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
@@ -567,10 +561,12 @@ class App():
# Print the session heading if we've loaded a pipeline and there
# is going to be a session
#
- def loaded_cb(self, pipeline):
+ def session_start_cb(self):
+ self._started = True
if self._session_name:
- self.logger.print_heading(pipeline,
- self._main_options['log_file'],
+ self.logger.print_heading(self.project,
+ self.stream,
+ log_file=self._main_options['log_file'],
styling=self.colors)
#
@@ -578,7 +574,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.stream, self.scheduler,
+ self.logger.print_summary(self.stream,
self._main_options['log_file'],
styling=self.colors)
@@ -642,7 +638,7 @@ class App():
def _interrupted(self):
self._status.clear()
try:
- with self.scheduler.jobs_suspended():
+ with self.stream.suspend():
yield
finally:
self._maybe_render_status()