diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-01 20:42:50 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2018-04-01 20:50:59 +0900 |
commit | 63d9946357d655b0af56202d4fba37ae3f8c5f06 (patch) | |
tree | b310f8db43c6a9832869094e9de2bb31c76bb393 | |
parent | b9546d4de7447e2f013cd263151d5d4a596e2dfe (diff) | |
download | buildstream-63d9946357d655b0af56202d4fba37ae3f8c5f06.tar.gz |
Include initialization time in the total session time of the build log
This patch refactors the frontend App object and touches a few internals.
o The scheduler now takes a start time given to it at instantiation time,
instead of considering the start time to commence in Scheduler.run()
o The App.initialize() method has been swapped out for an
App.initialized() context manager.
This context manager now takes care of the main start/fail/success
messages. For convenience and ensured consistency, this context manager
is now responsible for:
o Printing the startup heading
o Printing the end of session summary
o Exiting with a consistent error status in case of errors
o The Pipeline() object no longer prints the start/fail/success messages,
as they are now taken care of by the App()
o The cli.py frontend code is much simplified by using the new context
manager, also enforcing consistency in how we handle and report errors.
This fixes issue #186
-rw-r--r-- | buildstream/_exceptions.py | 4 | ||||
-rw-r--r-- | buildstream/_frontend/cli.py | 145 | ||||
-rw-r--r-- | buildstream/_frontend/main.py | 90 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 83 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 6 |
5 files changed, 140 insertions, 188 deletions
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 8a4f13aea..fb42d9fd2 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -220,7 +220,7 @@ class ArtifactError(BstError): # class PipelineError(BstError): - def __init__(self, message=None, *, detail=None, reason=None): + def __init__(self, message=None, *, detail=None, reason=None, terminated=False): # The empty string should never appear to a user, # this only allows us to treat this internal error as @@ -229,3 +229,5 @@ class PipelineError(BstError): message = "" super().__init__(message, detail=detail, domain=ErrorDomain.PIPELINE, reason=reason) + + self.terminated = terminated diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index c6c9d8156..5efa97b7e 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -3,7 +3,7 @@ import sys import click from .. import _yaml -from .._exceptions import BstError, PipelineError, LoadError +from .._exceptions import BstError, LoadError from ..__version__ import __version__ as build_stream_version from .complete import main_bashcomplete, complete_path, CompleteUnhandled @@ -201,17 +201,10 @@ def build(app, elements, all_, track_, track_save, track_all, track_except): if track_: rewritable = True - app.initialize(elements, except_=track_except, rewritable=rewritable, - use_configured_remote_caches=True, track_elements=track_, - fetch_subprojects=True) - app.print_heading() - try: + with app.initialized(elements, session_name="Build", except_=track_except, rewritable=rewritable, + use_configured_remote_caches=True, track_elements=track_, + fetch_subprojects=True): app.pipeline.build(app.scheduler, all_, track_) - app.print_summary() - except PipelineError as e: - app.print_error(e) - app.print_summary() - sys.exit(-1) ################################################################## @@ -244,19 +237,11 @@ def fetch(app, elements, deps, track_, except_): plan: Only dependencies required for the build plan all: All dependencies """ - - app.initialize(elements, except_=except_, rewritable=track_, - track_elements=elements if track_ else None, - fetch_subprojects=True) - try: + with app.initialized(elements, session_name="Fetch", except_=except_, rewritable=track_, + track_elements=elements if track_ else None, + fetch_subprojects=True): dependencies = app.pipeline.deps_elements(deps) - app.print_heading(deps=dependencies) app.pipeline.fetch(app.scheduler, dependencies, track_) - app.print_summary() - except PipelineError as e: - app.print_error(e) - app.print_summary() - sys.exit(-1) ################################################################## @@ -285,17 +270,10 @@ def track(app, elements, deps, except_): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, except_=except_, rewritable=True, track_elements=elements, - fetch_subprojects=True) - try: + with app.initialized(elements, session_name="Track", except_=except_, rewritable=True, + track_elements=elements, fetch_subprojects=True): dependencies = app.pipeline.deps_elements(deps) - app.print_heading(deps=dependencies) app.pipeline.track(app.scheduler, dependencies) - app.print_summary() - except PipelineError as e: - app.print_error(e) - app.print_summary() - sys.exit(-1) ################################################################## @@ -323,16 +301,10 @@ def pull(app, elements, deps, remote): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, use_configured_remote_caches=(remote is None), - add_remote_cache=remote, fetch_subprojects=True) - try: + with app.initialized(elements, session_name="Pull", use_configured_remote_caches=(remote is None), + add_remote_cache=remote, fetch_subprojects=True): to_pull = app.pipeline.deps_elements(deps) app.pipeline.pull(app.scheduler, to_pull) - app.print_summary() - except BstError as e: - app.print_error(e) - app.print_summary() - sys.exit(-1) ################################################################## @@ -359,16 +331,11 @@ def push(app, elements, deps, remote): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, use_configured_remote_caches=(remote is None), - add_remote_cache=remote, fetch_subprojects=True) - try: + with app.initialized(elements, session_name="Push", + use_configured_remote_caches=(remote is None), + add_remote_cache=remote, fetch_subprojects=True): to_push = app.pipeline.deps_elements(deps) app.pipeline.push(app.scheduler, to_push) - app.print_summary() - except BstError as e: - app.print_error(e) - app.print_summary() - sys.exit(-1) ################################################################## @@ -439,20 +406,17 @@ def show(app, elements, deps, except_, order, format_, downloadable): bst show target.bst --format \\ $'---------- %{name} ----------\\n%{vars}' """ - app.initialize(elements, except_=except_, use_configured_remote_caches=downloadable) - try: + with app.initialized(elements, except_=except_, use_configured_remote_caches=downloadable): + dependencies = app.pipeline.deps_elements(deps) - except PipelineError as e: - click.echo("{}".format(e), err=True) - sys.exit(-1) + if order == "alpha": + dependencies = sorted(dependencies) - if order == "alpha": - dependencies = sorted(dependencies) + if not format_: + format_ = app.context.log_element_format - if not format_: - format_ = app.context.log_element_format + report = app.logger.show_pipeline(dependencies, format_) - report = app.logger.show_pipeline(dependencies, format_) click.echo(report, color=app.colors) @@ -498,7 +462,8 @@ def shell(app, element, sysroot, mount, isolate, build_, command): else: scope = Scope.RUN - app.initialize((element,)) + with app.initialized((element,)): + pass # Assert we have everything we need built. missing_deps = [] @@ -548,13 +513,8 @@ def shell(app, element, sysroot, mount, isolate, build_, command): def checkout(app, element, directory, force, integrate, hardlinks): """Checkout a built artifact to the specified directory """ - app.initialize((element,)) - try: + with app.initialized((element,)): app.pipeline.checkout(directory, force, integrate, hardlinks) - click.echo("", err=True) - except BstError as e: - app.print_error(e) - sys.exit(-1) ################################################################## @@ -578,18 +538,12 @@ def checkout(app, element, directory, force, integrate, hardlinks): @click.pass_obj def source_bundle(app, target, force, directory, track_, compression, except_): - """Produce a source bundle to be manually executed""" - app.initialize((target,), rewritable=track_, track_elements=[target] if track_ else None) - try: + """Produce a source bundle to be manually executed + """ + with app.initialized((target,), rewritable=track_, track_elements=[target] if track_ else None): dependencies = app.pipeline.deps_elements('all') - app.print_heading(dependencies) app.pipeline.source_bundle(app.scheduler, dependencies, force, track_, compression, directory) - click.echo("", err=True) - except BstError as e: - click.echo("", err=True) - click.echo("ERROR: {}".format(e), err=True) - sys.exit(-1) ################################################################## @@ -618,13 +572,8 @@ def workspace(): def workspace_open(app, no_checkout, force, track_, element, directory): """Open a workspace for manual source modification""" - app.initialize((element,), rewritable=track_, track_elements=[element] if track_ else None) - try: + with app.initialized((element,), rewritable=track_, track_elements=[element] if track_ else None): app.pipeline.open_workspace(app.scheduler, directory, no_checkout, track_, force) - click.echo("", err=True) - except BstError as e: - app.print_error(e) - sys.exit(-1) ################################################################## @@ -639,24 +588,18 @@ def workspace_open(app, no_checkout, force, track_, element, directory): def workspace_close(app, remove_dir, element): """Close a workspace""" - app.initialize((element,)) + with app.initialized((element,)): - if app.pipeline.project._workspaces.get_workspace(app.pipeline.targets[0]) is None: - click.echo("ERROR: Workspace '{}' does not exist".format(element), err=True) - sys.exit(-1) - - if app.interactive and remove_dir: - if not click.confirm('This will remove all your changes, are you sure?'): - click.echo('Aborting', err=True) + if app.pipeline.project._workspaces.get_workspace(app.pipeline.targets[0]) is None: + click.echo("ERROR: Workspace '{}' does not exist".format(element), err=True) sys.exit(-1) - try: + if app.interactive and remove_dir: + if not click.confirm('This will remove all your changes, are you sure?'): + click.echo('Aborting', err=True) + sys.exit(-1) + app.pipeline.close_workspace(remove_dir) - click.echo("", err=True) - except BstError as e: - click.echo("", err=True) - click.echo("ERROR: {}".format(e), err=True) - sys.exit(-1) ################################################################## @@ -672,19 +615,13 @@ def workspace_close(app, remove_dir, element): @click.pass_obj def workspace_reset(app, track_, no_checkout, element): """Reset a workspace to its original state""" - app.initialize((element,)) - if app.interactive: - if not click.confirm('This will remove all your changes, are you sure?'): - click.echo('Aborting', err=True) - sys.exit(-1) + with app.initialized((element,)): + if app.interactive: + if not click.confirm('This will remove all your changes, are you sure?'): + click.echo('Aborting', err=True) + sys.exit(-1) - try: app.pipeline.reset_workspace(app.scheduler, track_, no_checkout) - click.echo("", err=True) - except BstError as e: - click.echo("", err=True) - click.echo("ERROR: {}".format(e), err=True) - sys.exit(-1) ################################################################## diff --git a/buildstream/_frontend/main.py b/buildstream/_frontend/main.py index fd2384ec3..5ec6884a0 100644 --- a/buildstream/_frontend/main.py +++ b/buildstream/_frontend/main.py @@ -21,6 +21,7 @@ import os import sys import resource +import datetime from contextlib import contextmanager from blessings import Terminal @@ -33,8 +34,8 @@ from .. import Scope # Import various buildstream internals from .._context import Context from .._project import Project -from .._exceptions import BstError -from .._message import MessageType, unconditional_messages +from .._exceptions import BstError, PipelineError +from .._message import Message, MessageType, unconditional_messages from .._pipeline import Pipeline from .._scheduler import Scheduler from .._profile import Topics, profile_start, profile_end @@ -50,10 +51,14 @@ INDENT = 4 ################################################################## # Main Application State # ################################################################## - class App(): def __init__(self, main_options): + + # Snapshot the start time of the session at the earliest opportunity, + # this is used for inclusive session time logging + self.session_start = datetime.datetime.now() + self.main_options = main_options self.logger = None self.status = None @@ -104,13 +109,39 @@ class App(): # Set soft limit to hard limit resource.setrlimit(resource.RLIMIT_NOFILE, (limits[1], limits[1])) + # initialized() # - # Initialize the main pipeline + # Context manager to initialize the application and optionally run a session + # within the context manager. # - def initialize(self, elements, except_=tuple(), rewritable=False, - use_configured_remote_caches=False, add_remote_cache=None, - track_elements=None, fetch_subprojects=False): - + # This context manager will take care of catching errors from within the + # context and report them consistently, so the CLI need not take care of + # reporting the errors and exiting with a consistent error status. + # + # Args: + # elements (list of elements): The elements to load recursively + # session_name (str): The name of the session, or None for no session + # except_ (list of elements): The elements to except + # rewritable (bool): Whether we should load the YAML files for roundtripping + # use_configured_remote_caches (bool): Whether we should contact remotes + # add_remote_cache (str): The URL for an explicitly mentioned remote cache + # track_elements (list of elements): Elements which are to be tracked + # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the + # loading process, if they are not yet locally cached + # + # Note that the except_ argument may have a subtly different meaning depending + # on the activity performed on the Pipeline. In normal circumstances the except_ + # argument excludes elements from the `elements` list. In a build session, the + # except_ elements are excluded from the tracking plan. + # + # If a session_name is provided, we treat the block as a session, and print + # the session header and summary, and time the main session from startup time. + # + @contextmanager + def initialized(self, elements, *, session_name=None, + except_=tuple(), rewritable=False, + use_configured_remote_caches=False, add_remote_cache=None, + track_elements=None, fetch_subprojects=False): profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) directory = self.main_options['directory'] @@ -150,7 +181,7 @@ class App(): self.interactive_failures = False # Create the application's scheduler - self.scheduler = Scheduler(self.context, + self.scheduler = Scheduler(self.context, self.session_start, interrupt_callback=self.interrupt_handler, ticker_callback=self.tick, job_start_callback=self.job_started, @@ -176,6 +207,9 @@ class App(): # Propagate pipeline feedback to the user self.context._set_message_handler(self.message_handler) + if session_name: + self.message(MessageType.START, session_name) + try: self.project = Project(directory, self.context, cli_options=self.main_options['option']) except BstError as e: @@ -209,6 +243,44 @@ class App(): profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) + # Print the heading + if session_name: + self.print_heading() + + # Run the body of the session here, once everything is loaded + try: + yield + + # Catch the error outside of the session timer and summarize what happened + except BstError as e: + elapsed = self.scheduler.elapsed_time() + + if session_name: + if isinstance(e, PipelineError) and e.terminated: # pylint: disable=no-member + self.message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed) + else: + self.message(MessageType.FAIL, session_name, elapsed=elapsed) + + self.print_error(e) + + if session_name: + self.print_summary() + + sys.exit(-1) + + # No exceptions occurred, print the summary + else: + if session_name: + self.message(MessageType.SUCCESS, session_name, elapsed=self.scheduler.elapsed_time()) + self.print_summary() + + # Local message propagator + # + def message(self, message_type, message, **kwargs): + args = dict(kwargs) + self.context._message( + Message(None, message_type, message, **args)) + # # Render the status area, conditional on some internal state # diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index c1f2afdef..59219ab11 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -379,22 +379,11 @@ class Pipeline(): self.assert_junction_tracking(dependencies, build=False) - self.message(MessageType.START, "Starting track") - elapsed, status = scheduler.run([track]) - changed = len(track.processed_elements) - + _, status = scheduler.run([track]) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Track failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, - "Terminated after updating {} source references".format(changed), - elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, - "Updated {} source references".format(changed), - elapsed=elapsed) + raise PipelineError(terminated=True) # fetch() # @@ -430,22 +419,11 @@ class Pipeline(): fetch.enqueue(plan) queues = [fetch] - self.message(MessageType.START, "Fetching {} elements".format(len(plan))) - elapsed, status = scheduler.run(queues) - fetched = len(fetch.processed_elements) - + _, status = scheduler.run(queues) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Fetch failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, - "Terminated after fetching {} elements".format(fetched), - elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, - "Fetched {} elements".format(fetched), - elapsed=elapsed) + raise PipelineError(terminated=True) def get_elements_to_track(self, track_targets): planner = Planner() @@ -526,17 +504,11 @@ class Pipeline(): self.session_elements = len(track_plan) + len(plan) - self.message(MessageType.START, "Starting build") - elapsed, status = scheduler.run(queues) - + _, status = scheduler.run(queues) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Build failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, "Terminated", elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, "Build Complete", elapsed=elapsed) + raise PipelineError(terminated=True) # checkout() # @@ -646,20 +618,11 @@ class Pipeline(): if queues: queues[0].enqueue(plan) - elapsed, status = scheduler.run(queues) - fetched = len(fetch.processed_elements) - + _, status = scheduler.run(queues) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Tracking failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, - "Terminated after fetching {} elements".format(fetched), - elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, - "Fetched {} elements".format(fetched), elapsed=elapsed) + raise PipelineError(terminated=True) if not no_checkout and target._consistency() != Consistency.CACHED: raise PipelineError("Could not stage uncached source. " + @@ -767,22 +730,11 @@ class Pipeline(): pull.enqueue(plan) queues = [pull] - self.message(MessageType.START, "Pulling {} artifacts".format(len(plan))) - elapsed, status = scheduler.run(queues) - pulled = len(pull.processed_elements) - + _, status = scheduler.run(queues) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Pull failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, - "Terminated after pulling {} elements".format(pulled), - elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, - "Pulled {} complete".format(pulled), - elapsed=elapsed) + raise PipelineError(terminated=True) # push() # @@ -805,22 +757,11 @@ class Pipeline(): push.enqueue(plan) queues = [push] - self.message(MessageType.START, "Pushing {} artifacts".format(len(plan))) - elapsed, status = scheduler.run(queues) - pushed = len(push.processed_elements) - + _, status = scheduler.run(queues) if status == SchedStatus.ERROR: - self.message(MessageType.FAIL, "Push failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(MessageType.WARN, - "Terminated after pushing {} elements".format(pushed), - elapsed=elapsed) - raise PipelineError() - else: - self.message(MessageType.SUCCESS, - "Pushed {} complete".format(pushed), - elapsed=elapsed) + raise PipelineError(terminated=True) # remove_elements(): # diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 574cabc83..84f538f3d 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -51,6 +51,7 @@ class SchedStatus(): # # Args: # context: The Context in the parent scheduling process +# start_time: The time at which the session started # interrupt_callback: A callback to handle ^C # ticker_callback: A callback call once per second # job_start_callback: A callback call when each job starts @@ -59,6 +60,7 @@ class SchedStatus(): class Scheduler(): def __init__(self, context, + start_time, interrupt_callback=None, ticker_callback=None, job_start_callback=None, @@ -70,7 +72,7 @@ class Scheduler(): self.job_complete_callback = job_complete_callback self.context = context self.queues = None - self.starttime = None + self.starttime = start_time self.suspendtime = None # Initialize task tokens with the number allowed by @@ -102,8 +104,6 @@ class Scheduler(): # def run(self, queues): - self.starttime = datetime.datetime.now() - # Attach the queues self.queues = queues for queue in queues: |