From cee2710aef51b32c2fc824ae6971acd610cd74ae Mon Sep 17 00:00:00 2001 From: Tristan Maat Date: Fri, 20 Oct 2017 16:27:18 +0100 Subject: Issue #117: Add `bst build --track` --- buildstream/_frontend/main.py | 49 +++++++++------ buildstream/_pipeline.py | 137 ++++++++++++++++++++++++++++-------------- 2 files changed, 121 insertions(+), 65 deletions(-) diff --git a/buildstream/_frontend/main.py b/buildstream/_frontend/main.py index 15fcf4b34..b25e398c7 100644 --- a/buildstream/_frontend/main.py +++ b/buildstream/_frontend/main.py @@ -99,7 +99,8 @@ def override_completions(cmd_param, ctx, args, incomplete): if isinstance(cmd_param.type, click.Path) and \ (cmd_param.name == 'elements' or cmd_param.name == 'element' or - cmd_param.name == 'except_'): + cmd_param.name == 'except_' or + cmd_param.opts == ['--track']): return complete_target(ctx, args, incomplete) raise CompleteUnhandled() @@ -193,23 +194,23 @@ def cli(context, **kwargs): @cli.command(short_help="Build elements in a pipeline") @click.option('--all', default=False, is_flag=True, help="Build elements that would not be needed for the current build plan") -@click.option('--track', default=False, is_flag=True, - help="Track new source references before building (implies --all)") +@click.option('--track', multiple=True, + type=click.Path(dir_okay=False, readable=True), + help="Specify elements to track during the build. Can be used " + "repeatedly to specify multiple elements") @click.option('--track-save', default=False, is_flag=True, - help="Track new source references before building, updating their " - "corresponding element files") + help="Write out the tracked references to their element files") @click.argument('elements', nargs=-1, type=click.Path(dir_okay=False, readable=True)) @click.pass_obj def build(app, elements, all, track, track_save): """Build elements in a pipeline""" - track_first = track or track_save - - app.initialize(elements, rewritable=track_save, inconsistent=track_first, use_remote_cache=True) + app.initialize(elements, rewritable=track_save) + app.pipeline.initialize(use_remote_cache=True, inconsistent=track) app.print_heading() try: - app.pipeline.build(app.scheduler, all, track_first, track_save) + app.pipeline.build(app.scheduler, all, track, track_save) click.echo("") app.print_summary() except PipelineError: @@ -248,8 +249,9 @@ def fetch(app, elements, deps, track, except_): plan: Only dependencies required for the build plan all: All dependencies """ - app.initialize(elements, except_=except_, - rewritable=track, inconsistent=track) + + app.initialize(elements, except_=except_, rewritable=track) + app.pipeline.initialize(inconsistent=elements if track else None) try: dependencies = app.pipeline.deps_elements(deps) app.print_heading(deps=dependencies) @@ -288,8 +290,8 @@ def track(app, elements, deps, except_): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, except_=except_, - rewritable=True, inconsistent=True) + app.initialize(elements, except_=except_, rewritable=True) + app.pipeline.initialize(inconsistent=elements) try: dependencies = app.pipeline.deps_elements(deps) app.print_heading(deps=dependencies) @@ -321,7 +323,8 @@ def pull(app, elements, deps): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, use_remote_cache=True) + app.initialize(elements) + app.pipeline.initialize(use_remote_cache=True) try: to_pull = app.pipeline.deps_elements(deps) app.pipeline.pull(app.scheduler, to_pull) @@ -351,7 +354,8 @@ def push(app, elements, deps): none: No dependencies, just the element itself all: All dependencies """ - app.initialize(elements, use_remote_cache=True) + app.initialize(elements) + app.pipeline.initialize(use_remote_cache=True) try: to_push = app.pipeline.deps_elements(deps) app.pipeline.push(app.scheduler, to_push) @@ -430,7 +434,8 @@ def show(app, elements, deps, except_, order, format, downloadable): bst show target.bst --format \\ $'---------- %{name} ----------\\n%{vars}' """ - app.initialize(elements, except_=except_, use_remote_cache=downloadable) + app.initialize(elements, except_=except_) + app.pipeline.initialize(use_remote_cache=downloadable) try: dependencies = app.pipeline.deps_elements(deps) except PipelineError as e: @@ -483,6 +488,7 @@ def shell(app, element, sysroot, build, command): scope = Scope.RUN app.initialize((element,)) + app.pipeline.initialize() # Assert we have everything we need built. missing_deps = [] @@ -527,6 +533,7 @@ def checkout(app, element, directory, force, integrate, hardlinks): """Checkout a built artifact to the specified directory """ app.initialize((element,)) + app.pipeline.initialize() try: app.pipeline.checkout(directory, force, integrate, hardlinks) click.echo("") @@ -558,7 +565,8 @@ def checkout(app, element, directory, force, integrate, hardlinks): def source_bundle(app, target, force, directory, track, compression, except_): """Produce a source bundle to be manually executed""" - app.initialize((target,), rewritable=track, inconsistent=track) + app.initialize((target,), rewritable=track) + app.pipeline.initialize(inconsistent=[target]) try: dependencies = app.pipeline.deps_elements('all') app.print_heading(dependencies) @@ -599,7 +607,8 @@ def workspace(): def workspace_open(app, no_checkout, force, source, track, element, directory): """Open a workspace for manual source modification""" - app.initialize((element,), rewritable=track, inconsistent=track) + app.initialize((element,), rewritable=track) + app.pipeline.initialize(inconsistent=[element]) try: app.pipeline.open_workspace(app.scheduler, directory, source, no_checkout, track, force) click.echo("") @@ -624,6 +633,7 @@ def workspace_close(app, source, remove_dir, element): """Close a workspace""" app.initialize((element,)) + app.pipeline.initialize() if app.interactive and remove_dir: if not click.confirm('This will remove all your changes, are you sure?'): click.echo('Aborting') @@ -654,6 +664,7 @@ def workspace_close(app, source, remove_dir, element): def workspace_reset(app, source, track, no_checkout, element): """Reset a workspace to its original state""" app.initialize((element,)) + app.pipeline.initialize() if app.interactive: if not click.confirm('This will remove all your changes, are you sure?'): click.echo('Aborting') @@ -847,9 +858,7 @@ class App(): try: self.pipeline = Pipeline(self.context, self.project, elements, except_, - inconsistent=inconsistent, rewritable=rewritable, - use_remote_cache=use_remote_cache, load_ticker=self.load_ticker, resolve_ticker=self.resolve_ticker, remote_ticker=self.remote_ticker, diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index ec374df71..c1d3c1b56 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -51,7 +51,7 @@ class Planner(): # Here we want to traverse the same element more than once when # it is reachable from multiple places, with the interest of finding # the deepest occurance of every element - def plan_element(self, element, depth): + def plan_element(self, element, depth, ignore_cache): if element in self.visiting_elements: # circular dependency, already being processed return @@ -63,22 +63,22 @@ class Planner(): self.visiting_elements.add(element) for dep in element.dependencies(Scope.RUN, recurse=False): - self.plan_element(dep, depth) + self.plan_element(dep, depth, ignore_cache) # Dont try to plan builds of elements that are cached already - if not element._cached() and not element._remotely_cached(): + if ignore_cache or (not element._cached() and not element._remotely_cached()): for dep in element.dependencies(Scope.BUILD, recurse=False): - self.plan_element(dep, depth + 1) + self.plan_element(dep, depth + 1, ignore_cache) self.depth_map[element] = depth self.visiting_elements.remove(element) - def plan(self, roots): + def plan(self, roots, ignore_cache=False): for root in roots: - self.plan_element(root, 0) + self.plan_element(root, 0, ignore_cache) depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True) - return [item[0] for item in depth_sorted if not item[0]._cached()] + return [item[0] for item in depth_sorted if ignore_cache or not item[0]._cached()] # Pipeline() @@ -112,13 +112,11 @@ class Planner(): class Pipeline(): def __init__(self, context, project, targets, except_, - inconsistent=False, rewritable=False, - use_remote_cache=False, - load_ticker=None, - resolve_ticker=None, remote_ticker=None, - cache_ticker=None): + cache_ticker=None, + load_ticker=None, + resolve_ticker=None): self.context = context self.project = project self.session_elements = 0 @@ -130,6 +128,8 @@ class Pipeline(): Platform._create_instance(context, project) self.platform = Platform.get_platform() self.artifacts = self.platform.artifactcache + self.remote_ticker = remote_ticker + self.cache_ticker = cache_ticker loader = Loader(self.project.element_path, targets + except_, self.project._options) @@ -152,14 +152,26 @@ class Pipeline(): if resolve_ticker: resolve_ticker(None) - # Preflight directly after resolving elements, before ever interrogating - # caches or anything. - for plugin in self.dependencies(Scope.ALL, include_sources=True): - plugin.preflight() + def initialize(self, use_remote_cache=False, inconsistent=None): + # Preflight directly, before ever interrogating caches or + # anything. + self.preflight() self.total_elements = len(list(self.dependencies(Scope.ALL))) - for element_name, source, workspace in project._list_workspaces(): + self.initialize_workspaces() + + if use_remote_cache and self.artifacts.can_fetch(): + self.fetch_remote_refs() + + self.resolve_cache_keys(inconsistent) + + def preflight(self): + for plugin in self.dependencies(Scope.ALL, include_sources=True): + plugin.preflight() + + def initialize_workspaces(self): + for element_name, source, workspace in self.project._list_workspaces(): for target in self.targets: element = target.search(Scope.ALL, element_name) @@ -169,21 +181,25 @@ class Pipeline(): self.project._set_workspace(element, source, workspace) - if use_remote_cache and self.artifacts.can_fetch(): - try: - if remote_ticker: - remote_ticker(self.artifacts.url) - self.artifacts.initialize_remote() - self.artifacts.fetch_remote_refs() - except ArtifactError: - self.message(MessageType.WARN, "Failed to fetch remote refs") - self.artifacts.set_offline() + def fetch_remote_refs(self): + try: + if self.remote_ticker: + self.remote_ticker(self.artifacts.url) + self.artifacts.initialize_remote() + self.artifacts.fetch_remote_refs() + except ArtifactError: + self.message(MessageType.WARN, "Failed to fetch remote refs") + self.artifacts.set_offline() + + def resolve_cache_keys(self, inconsistent): + if inconsistent: + inconsistent = self.get_elements_to_track(inconsistent) for element in self.dependencies(Scope.ALL): - if cache_ticker: - cache_ticker(element.name) + if self.cache_ticker: + self.cache_ticker(element.name) - if inconsistent: + if inconsistent and element in inconsistent: # Load the pipeline in an explicitly inconsistent state, use # this for pipelines with tracking queues enabled. element._force_inconsistent() @@ -192,8 +208,8 @@ class Pipeline(): # for the first time. element._cached() - if cache_ticker: - cache_ticker(None) + if self.cache_ticker: + self.cache_ticker(None) # Generator function to iterate over elements and optionally # also iterate over sources. @@ -234,9 +250,11 @@ class Pipeline(): # which are required to build the pipeline target, omitting # cached elements. The elements are yielded in a depth sorted # ordering for optimal build plans - def plan(self): + def plan(self, except_=True): build_plan = Planner().plan(self.targets) - self.remove_elements(build_plan) + + if except_: + build_plan = self.remove_elements(build_plan) for element in build_plan: yield element @@ -302,7 +320,7 @@ class Pipeline(): def track(self, scheduler, dependencies): dependencies = list(dependencies) - track = TrackQueue() + track = TrackQueue(save=True) track.enqueue(dependencies) self.session_elements = len(dependencies) @@ -374,6 +392,15 @@ class Pipeline(): "Fetched {} elements".format(fetched), elapsed=elapsed) + def get_elements_to_track(self, track_targets): + planner = Planner() + + target_elements = [e for e in self.dependencies(Scope.ALL) + if e.name in track_targets] + track_elements = planner.plan(target_elements, ignore_cache=True) + + return self.remove_elements(track_elements) + # build() # # Builds (assembles) elements in the pipeline. @@ -393,15 +420,30 @@ class Pipeline(): detail="\n".join([el + "-" + str(src) for el, src, _ in self.unused_workspaces])) - if build_all or track_first: - plan = list(self.dependencies(Scope.ALL)) + # We set up two plans; one to track elements, the other to + # build them once tracking has finished. The first plan + # contains elements from track_first, the second contains the + # target elements. + # + # The reason we can't use one plan is that the tracking + # elements may consist of entirely different elements. + track_plan = [] + if track_first: + track_plan = self.get_elements_to_track(track_first) + + if build_all: + plan = self.dependencies(Scope.ALL) else: - plan = list(self.plan()) + plan = self.plan() - # Assert that we have a consistent pipeline, or that - # the track option will make it consistent - if not track_first: - self.assert_consistent(plan) + # We want to start the build queue with any elements that are + # not being tracked first + track_elements = set(track_plan) + plan = [e for e in plan if e not in track_elements] + + # Assert that we have a consistent pipeline now (elements in + # track_plan will be made consistent) + self.assert_consistent(plan) fetch = FetchQueue(skip_cached=True) build = BuildQueue() @@ -409,7 +451,7 @@ class Pipeline(): pull = None push = None queues = [] - if track_first: + if track_plan: track = TrackQueue(save=save) queues.append(track) if self.artifacts.can_fetch(): @@ -420,9 +462,14 @@ class Pipeline(): if self.artifacts.can_push(): push = PushQueue() queues.append(push) - queues[0].enqueue(plan) - self.session_elements = len(plan) + if track: + queues[0].enqueue(track_plan) + queues[1].enqueue(plan) + else: + queues[0].enqueue(plan) + + self.session_elements = len(track_plan) + len(plan) self.message(MessageType.START, "Starting build") elapsed, status = scheduler.run(queues) @@ -792,7 +839,7 @@ class Pipeline(): # use in the result, this function reports a list that is appropriate for # the selected option. # - def deps_elements(self, mode, except_=None): + def deps_elements(self, mode): elements = None if mode == 'none': -- cgit v1.2.1