diff options
-rw-r--r-- | buildstream/_frontend/app.py | 70 | ||||
-rw-r--r-- | buildstream/_frontend/status.py | 25 | ||||
-rw-r--r-- | buildstream/_frontend/widget.py | 37 | ||||
-rw-r--r-- | buildstream/_pipeline.py | 369 | ||||
-rw-r--r-- | buildstream/_stream.py | 688 | ||||
-rw-r--r-- | tests/frontend/buildtrack.py | 86 | ||||
-rw-r--r-- | tests/plugins/pipeline.py | 16 |
7 files changed, 685 insertions, 606 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 58a8eab89..de23c12b0 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -39,7 +39,6 @@ from .._project import Project from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError from .._message import Message, MessageType, unconditional_messages from .._stream import Stream -from .._scheduler import Scheduler from .._versions import BST_FORMAT_VERSION from .. import __version__ as build_stream_version from .. import _yaml @@ -69,7 +68,6 @@ class App(): self.context = None # The Context object self.stream = None # The Stream object self.project = None # The toplevel Project object - self.scheduler = None # The Scheduler self.logger = None # The LogLine object self.interactive = None # Whether we are running in interactive mode self.colors = None # Whether to use colors in logging @@ -83,6 +81,7 @@ class App(): self._status = None # The Status object self._fail_messages = {} # Failure messages by unique plugin id self._interactive_failures = None # Whether to handle failures interactively + self._started = False # Whether a session has started # UI Colors Profiles self._content_profile = Profile(fg='yellow') @@ -202,14 +201,12 @@ class App(): self._error_exit(e, "Error loading project") # Create the stream right away, we'll need to pass it around - self.stream = Stream(self.context, self.project, self.loaded_cb) - - # Create the application's scheduler - self.scheduler = Scheduler(self.context, self._session_start, - interrupt_callback=self._interrupt_handler, - ticker_callback=self._tick, - job_start_callback=self._job_started, - job_complete_callback=self._job_completed) + self.stream = Stream(self.context, self.project, self._session_start, + session_start_callback=self.session_start_cb, + interrupt_callback=self._interrupt_handler, + ticker_callback=self._tick, + job_start_callback=self._job_started, + job_complete_callback=self._job_completed) # Create the logger right before setting the message handler self.logger = LogLine(self.context, @@ -224,8 +221,7 @@ class App(): self._status = Status(self.context, self._content_profile, self._format_profile, self._success_profile, self._error_profile, - self.stream, self.scheduler, - colors=self.colors) + self.stream, colors=self.colors) # Propagate pipeline feedback to the user self.context.set_message_handler(self._message_handler) @@ -238,10 +234,6 @@ class App(): if session_name: self._message(MessageType.START, session_name) - # XXX This is going to change soon ! - # - self.stream._scheduler = self.scheduler - # Run the body of the session here, once everything is loaded try: yield @@ -249,14 +241,15 @@ class App(): # Print a nice summary if this is a session if session_name: - elapsed = self.scheduler.elapsed_time() + elapsed = self.stream.elapsed_time if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed) else: self._message(MessageType.FAIL, session_name, elapsed=elapsed) - self._print_summary() + if self._started: + self._print_summary() # Exit with the error self._error_exit(e) @@ -264,8 +257,9 @@ class App(): else: # No exceptions occurred, print session time and summary if session_name: - self._message(MessageType.SUCCESS, session_name, elapsed=self.scheduler.elapsed_time()) - self._print_summary() + self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time) + if self._started: + self._print_summary() # init_project() # @@ -400,8 +394,8 @@ class App(): # If the scheduler has started, try to terminate all jobs gracefully, # otherwise exit immediately. - if self.scheduler.loop: - self.scheduler.terminate_jobs() + if self.stream.running: + self.stream.terminate() else: sys.exit(-1) @@ -411,8 +405,8 @@ class App(): def _maybe_render_status(self): # If we're suspended or terminating, then dont render the status area - if self._status and self.scheduler and \ - not (self.scheduler.suspended or self.scheduler.terminated): + if self._status and self.stream and \ + not (self.stream.suspended or self.stream.terminated): self._status.render() # @@ -423,7 +417,7 @@ class App(): # Only handle ^C interactively in interactive mode if not self.interactive: self._status.clear() - self.scheduler.terminate_jobs() + self.stream.terminate() return # Here we can give the user some choices, like whether they would @@ -452,11 +446,11 @@ class App(): if choice == 'terminate': click.echo("\nTerminating all jobs at user request\n", err=True) - self.scheduler.terminate_jobs() + self.stream.terminate() else: if choice == 'quit': click.echo("\nCompleting ongoing tasks before quitting\n", err=True) - self.scheduler.stop_queueing() + self.stream.quit() elif choice == 'continue': click.echo("\nContinuing\n", err=True) @@ -473,7 +467,7 @@ class App(): # Dont attempt to handle a failure if the user has already opted to # terminate - if not success and not self.scheduler.terminated: + if not success and not self.stream.terminated: # Get the last failure message for additional context failure = self._fail_messages.get(element._get_unique_id()) @@ -494,9 +488,9 @@ class App(): if not self._interactive_failures: if self.context.sched_error_action == 'terminate': - self.scheduler.terminate_jobs() + self.stream.terminate() elif self.context.sched_error_action == 'quit': - self.scheduler.stop_queueing() + self.stream.quit() elif self.context.sched_error_action == 'continue': pass return @@ -551,11 +545,11 @@ class App(): if choice == 'terminate': click.echo("\nTerminating all jobs\n", err=True) - self.scheduler.terminate_jobs() + self.stream.terminate() else: if choice == 'quit': click.echo("\nCompleting ongoing tasks before quitting\n", err=True) - self.scheduler.stop_queueing() + self.stream.quit() elif choice == 'continue': click.echo("\nContinuing with other non failing elements\n", err=True) elif choice == 'retry': @@ -567,10 +561,12 @@ class App(): # Print the session heading if we've loaded a pipeline and there # is going to be a session # - def loaded_cb(self, pipeline): + def session_start_cb(self): + self._started = True if self._session_name: - self.logger.print_heading(pipeline, - self._main_options['log_file'], + self.logger.print_heading(self.project, + self.stream, + log_file=self._main_options['log_file'], styling=self.colors) # @@ -578,7 +574,7 @@ class App(): # def _print_summary(self): click.echo("", err=True) - self.logger.print_summary(self.stream, self.scheduler, + self.logger.print_summary(self.stream, self._main_options['log_file'], styling=self.colors) @@ -642,7 +638,7 @@ class App(): def _interrupted(self): self._status.clear() try: - with self.scheduler.jobs_suspended(): + with self.stream.suspend(): yield finally: self._maybe_render_status() diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py index 13e00f58f..0e5855181 100644 --- a/buildstream/_frontend/status.py +++ b/buildstream/_frontend/status.py @@ -39,7 +39,6 @@ from .widget import TimeCode # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text # stream (Stream): The Stream -# scheduler (Scheduler): The Scheduler # colors (bool): Whether to print the ANSI color codes in the output # class Status(): @@ -47,14 +46,14 @@ class Status(): def __init__(self, context, content_profile, format_profile, success_profile, error_profile, - stream, scheduler, colors=False): + stream, colors=False): self._context = context self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile - self._scheduler = scheduler + self._stream = stream self._jobs = [] self._last_lines = 0 # Number of status lines we last printed to console self._term = Terminal() @@ -63,7 +62,7 @@ class Status(): self._header = _StatusHeader(context, content_profile, format_profile, success_profile, error_profile, - stream, scheduler) + stream) self._term_width, _ = click.get_terminal_size() self._alloc_lines = 0 @@ -80,7 +79,7 @@ class Status(): # action_name (str): The action name for this job # def add_job(self, element, action_name): - elapsed = self._scheduler.elapsed_time() + elapsed = self._stream.elapsed_time job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed) self._jobs.append(job) self._need_alloc = True @@ -136,7 +135,7 @@ class Status(): if not self._term.does_styling: return - elapsed = self._scheduler.elapsed_time() + elapsed = self._stream.elapsed_time self.clear() self._check_term_width() @@ -251,14 +250,13 @@ class Status(): # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text # stream (Stream): The Stream -# scheduler (Scheduler): The Scheduler # class _StatusHeader(): def __init__(self, context, content_profile, format_profile, success_profile, error_profile, - stream, scheduler): + stream): # # Public members @@ -273,7 +271,6 @@ class _StatusHeader(): self._success_profile = success_profile self._error_profile = error_profile self._stream = stream - self._scheduler = scheduler self._time_code = TimeCode(context, content_profile, format_profile) self._context = context @@ -283,8 +280,8 @@ class _StatusHeader(): size = 0 text = '' - session = str(self._stream.session_elements) - total = str(self._stream.total_elements) + session = str(len(self._stream.session_elements)) + total = str(len(self._stream.total_elements)) # Format and calculate size for target and overall time code size += len(total) + len(session) + 4 # Size for (N/N) with a leading space @@ -303,10 +300,10 @@ class _StatusHeader(): text = '' # Format and calculate size for each queue progress - for queue in self._scheduler.queues: + for queue in self._stream.queues: # Add spacing - if self._scheduler.queues.index(queue) > 0: + if self._stream.queues.index(queue) > 0: size += 2 text += self._format_profile.fmt('→ ') @@ -366,7 +363,7 @@ class _StatusHeader(): # action_name (str): The name of the action # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text -# elapsed (datetime): The offset of the scheduler when this job is created +# elapsed (datetime): The offset into the session when this job is created # class _StatusJob(): diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py index 814f87ff5..5b405682a 100644 --- a/buildstream/_frontend/widget.py +++ b/buildstream/_frontend/widget.py @@ -28,7 +28,7 @@ import click from ruamel import yaml from . import Profile -from .. import Element, Scope, Consistency +from .. import Element, Consistency from .. import _yaml from .. import __version__ as bst_version from .._exceptions import ImplError @@ -435,23 +435,17 @@ class LogLine(Widget): # and so on. # # Args: - # pipeline (Pipeline): The pipeline to print the heading of + # project (Project): The toplevel project we were invoked from + # stream (Stream): The stream # log_file (file): An optional file handle for additional logging - # deps (list): Optional list of elements, default is to use the whole pipeline # styling (bool): Whether to enable ansi escape codes in the output # - def print_heading(self, pipeline, log_file, deps=None, styling=False): + def print_heading(self, project, stream, *, log_file, styling=False): context = self.context - project = pipeline.project starttime = datetime.datetime.now() text = '' - assert self._resolved_keys is None - elements = set() - visited = {} - for element in pipeline.targets: - elements.update(element.dependencies(Scope.ALL, visited=visited)) - self._resolved_keys = {element: element._get_cache_key() for element in elements} + self._resolved_keys = {element: element._get_cache_key() for element in stream.session_elements} # Main invocation context text += '\n' @@ -459,7 +453,7 @@ class LogLine(Widget): values = OrderedDict() values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S') values["Project"] = "{} ({})".format(project.name, project.directory) - values["Targets"] = ", ".join([t.name for t in pipeline.targets]) + values["Targets"] = ", ".join([t.name for t in stream.targets]) text += self._format_values(values) # User configurations @@ -494,9 +488,7 @@ class LogLine(Widget): # Pipeline state text += self.content_profile.fmt("Pipeline\n", bold=True) - if deps is None: - deps = pipeline.dependencies(Scope.ALL) - text += self.show_pipeline(deps, context.log_element_format) + text += self.show_pipeline(stream.total_elements, context.log_element_format) text += '\n' # Separator line before following output @@ -512,16 +504,15 @@ class LogLine(Widget): # # Args: # stream (Stream): The Stream - # scheduler (Scheduler): The Scheduler # log_file (file): An optional file handle for additional logging # styling (bool): Whether to enable ansi escape codes in the output # - def print_summary(self, stream, scheduler, log_file, styling=False): + def print_summary(self, stream, log_file, styling=False): # Early silent return if there are no queues, can happen - # only in the case that the pipeline early returned due to + # only in the case that the stream early returned due to # an inconsistent pipeline state. - if scheduler.queues is None: + if not stream.queues: return text = '' @@ -544,18 +535,18 @@ class LogLine(Widget): text += self.content_profile.fmt("Pipeline Summary\n", bold=True) values = OrderedDict() - values['Total'] = self.content_profile.fmt(str(stream.total_elements)) - values['Session'] = self.content_profile.fmt(str(stream.session_elements)) + values['Total'] = self.content_profile.fmt(str(len(stream.total_elements))) + values['Session'] = self.content_profile.fmt(str(len(stream.session_elements))) processed_maxlen = 1 skipped_maxlen = 1 failed_maxlen = 1 - for queue in scheduler.queues: + for queue in stream.queues: processed_maxlen = max(len(str(len(queue.processed_elements))), processed_maxlen) skipped_maxlen = max(len(str(len(queue.skipped_elements))), skipped_maxlen) failed_maxlen = max(len(str(len(queue.failed_elements))), failed_maxlen) - for queue in scheduler.queues: + for queue in stream.queues: processed = str(len(queue.processed_elements)) skipped = str(len(queue.skipped_elements)) failed = str(len(queue.failed_elements)) diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index 5327d3fb6..8861556c9 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -18,13 +18,16 @@ # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> # Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> +import os import itertools from operator import itemgetter from ._exceptions import PipelineError from ._message import Message, MessageType from ._loader import Loader +from ._profile import Topics, profile_start, profile_end from .element import Element from . import Scope, Consistency from ._project import ProjectRefStorage @@ -60,60 +63,59 @@ class PipelineSelection(): # Pipeline() # # Args: -# context (Context): The Context object # project (Project): The Project object -# target (str): A bst filename relative to the project directory -# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state, -# this is appropriate when source tracking will run and the -# current source refs will not be the effective refs. -# rewritable (bool): Whether the loaded files should be rewritable -# this is a bit more expensive due to deep copies -# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the -# loading process, if they are not yet locally cached -# -# The ticker methods will be called with an element name for each tick, a final -# tick with None as the argument is passed to signal that processing of this -# stage has terminated. -# -# Raises: -# LoadError -# PluginError -# SourceError -# ElementError -# ProgramNotFoundError +# context (Context): The Context object +# artifacts (Context): The ArtifactCache object # class Pipeline(): - def __init__(self, context, project, artifacts, targets, except_, *, - rewritable=False, - fetch_subprojects=True): + def __init__(self, context, project, artifacts): - self.context = context # The Context - self.project = project # The toplevel project - self.targets = [] # List of toplevel target Element objects + self._context = context # The Context + self._project = project # The toplevel project # # Private members # self._artifacts = artifacts self._loader = None - self._exceptions = None - self._track_cross_junctions = False - self._track_elements = [] - # - # Early initialization - # + # load() + # + # Loads elements from target names. + # + # This function is called with a list of lists, such that multiple + # target groups may be specified. Element names specified in `targets` + # are allowed to be redundant. + # + # Args: + # target_groups (list of lists): Groups of toplevel targets to load + # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the + # loading process, if they are not yet locally cached + # rewritable (bool): Whether the loaded files should be rewritable + # this is a bit more expensive due to deep copies + # + # Returns: + # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups + # + def load(self, target_groups, *, + fetch_subprojects=True, + rewritable=False): + + # First concatenate all the lists for the loader's sake + targets = list(itertools.chain(*target_groups)) - self._loader = Loader(self.context, self.project, targets + except_, + profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) + + self._loader = Loader(self._context, self._project, targets, fetch_subprojects=fetch_subprojects) - with self.context.timed_activity("Loading pipeline", silent_nested=True): + with self._context.timed_activity("Loading pipeline", silent_nested=True): meta_elements = self._loader.load(rewritable, None) # Resolve the real elements now that we've loaded the project - with self.context.timed_activity("Resolving pipeline"): - resolved_elements = [ + with self._context.timed_activity("Resolving pipeline"): + elements = [ Element._new_from_meta(meta, self._artifacts) for meta in meta_elements ] @@ -130,61 +132,89 @@ class Pipeline(): detail += "\n".join(lines) self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail) - self.targets = resolved_elements[:len(targets)] - self._exceptions = resolved_elements[len(targets):] + # Now create element groups to match the input target groups + elt_iter = iter(elements) + element_groups = [ + [next(elt_iter) for i in range(len(group))] + for group in target_groups + ] + + profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets)) + + return tuple(element_groups) - # initialize() + # resolve_elements() # - # Initialize the pipeline + # Resolve element state and cache keys. # # Args: - # track_element (list of Elements): List of elements specified by the frontend for tracking - # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries - # track_selection (PipelineSelection): The selection algorithm for track elements + # targets (list of Element): The list of toplevel element targets # - def initialize(self, - track_elements=None, - track_cross_junctions=False, - track_selection=PipelineSelection.ALL): + def resolve_elements(self, targets): + with self._context.timed_activity("Resolving cached state", silent_nested=True): + for element in self.dependencies(targets, Scope.ALL): - # Preflight directly, before ever interrogating caches or anything. - self._preflight() + # Preflight + element._preflight() - # Work out what we're going track, if anything - self._track_cross_junctions = track_cross_junctions - if track_elements: - self._track_elements = self._get_elements_to_track(track_elements, track_selection) - - # Now resolve the cache keys once tracking elements have been resolved - self._resolve_cache_keys() + # Determine initial element state. + element._update_state() - # cleanup() + # dependencies() # - # Cleans up resources used by the Pipeline. + # Generator function to iterate over elements and optionally + # also iterate over sources. # - def cleanup(self): - if self._loader: - self._loader.cleanup() + # Args: + # targets (list of Element): The target Elements to loop over + # scope (Scope): The scope to iterate over + # recurse (bool): Whether to recurse into dependencies + # + def dependencies(self, targets, scope, *, recurse=True): + # Keep track of 'visited' in this scope, so that all targets + # share the same context. + visited = {} - # Reset the element loader state - Element._reset_load_state() + for target in targets: + for element in target.dependencies(scope, recurse=recurse, visited=visited): + yield element + + # plan() + # + # Generator function to iterate over only the elements + # which are required to build the pipeline target, omitting + # cached elements. The elements are yielded in a depth sorted + # ordering for optimal build plans + # + # Args: + # elements (list of Element): List of target elements to plan + # + # Returns: + # (list of Element): A depth sorted list of the build plan + # + def plan(self, elements): + return _Planner().plan(elements) # get_selection() # + # Gets a full list of elements based on a toplevel + # list of element targets + # # Args: + # targets (list of Element): The target Elements # mode (PipelineSelection): The PipelineSelection mode # # Various commands define a --deps option to specify what elements to # use in the result, this function reports a list that is appropriate for # the selected option. # - def get_selection(self, mode): + def get_selection(self, targets, mode): elements = None if mode == PipelineSelection.NONE: - elements = self.targets + elements = targets elif mode == PipelineSelection.PLAN: - elements = list(self._plan()) + elements = self.plan(targets) else: if mode == PipelineSelection.ALL: scope = Scope.ALL @@ -193,49 +223,27 @@ class Pipeline(): elif mode == PipelineSelection.RUN: scope = Scope.RUN - elements = list(self.dependencies(scope)) - - return self.remove_elements(elements) - - # dependencies() - # - # Generator function to iterate over elements and optionally - # also iterate over sources. - # - # Args: - # scope (Scope): The scope to iterate over - # recurse (bool): Whether to recurse into dependencies - # include_sources (bool): Whether to include element sources in iteration - # - def dependencies(self, scope, *, recurse=True, include_sources=False): - # Keep track of 'visited' in this scope, so that all targets - # share the same context. - visited = {} - - for target in self.targets: - for element in target.dependencies(scope, recurse=recurse, visited=visited): - if include_sources: - for source in element.sources(): - yield source - yield element + elements = list(self.dependencies(targets, scope)) - ############################################################# - # Commands # - ############################################################# + return elements - # remove_elements(): - # - # Internal function + # except_elements(): # # Return what we are left with after the intersection between # excepted and target elements and their unique dependencies is # gone. # # Args: - # elements (list of elements): The list to remove elements from. - def remove_elements(self, elements): - targeted = list(self.dependencies(Scope.ALL)) - + # targets (list of Element): List of toplevel targetted elements + # elements (list of Element): The list to remove elements from + # except_targets (list of Element): List of toplevel except targets + # + # Returns: + # (list of Element): The elements list with the intersected + # exceptions removed + # + def except_elements(self, targets, elements, except_targets): + targeted = list(self.dependencies(targets, Scope.ALL)) visited = [] def find_intersection(element): @@ -255,7 +263,7 @@ class Pipeline(): # elements that lie on the border closest to excepted elements # between excepted and target elements. intersection = list(itertools.chain.from_iterable( - find_intersection(element) for element in self._exceptions + find_intersection(element) for element in except_targets )) # Now use this set of elements to traverse the targeted @@ -264,7 +272,7 @@ class Pipeline(): queue = [] visited = [] - queue.extend(self.targets) + queue.extend(targets) while queue: element = queue.pop() if element in visited or element in intersection: @@ -282,89 +290,77 @@ class Pipeline(): # in before. return [element for element in elements if element in visited] - ############################################################# - # Private Methods # - ############################################################# - - # _get_elements_to_track(): + # targets_include() + # + # Checks whether the given targets are, or depend on some elements # - # Work out which elements are going to be tracked. + # Args: + # targets (list of Element): A list of targets + # elements (list of Element): List of elements to check + # + # Returns: + # (bool): True if all of `elements` are the `targets`, or are + # somehow depended on by `targets`. # - # Currently the 'mode' parameter only accepts - # PipelineSelection.NONE or PipelineSelection.ALL + def targets_include(self, targets, elements): + target_element_set = set(self.dependencies(targets, Scope.ALL)) + element_set = set(elements) + return element_set.issubset(target_element_set) + + # subtract_elements() # - # This makes the assumption that the except elements are - # meant to be removed from tracking element lists. + # Subtract a subset of elements # # Args: - # track_targets (list of str): List of target names - # mode (PipelineSelection): The PipelineSelection mode + # elements (list of Element): The element list + # subtract (list of Element): List of elements to subtract from elements # # Returns: - # (list): List of Element objects to track + # (list): The original elements list, with elements in subtract removed # - def _get_elements_to_track(self, track_targets, mode=PipelineSelection.ALL): - planner = _Planner() - - # Convert target names to elements - track_elements = [e for e in self.dependencies(Scope.ALL) - if e.name in track_targets] - - if mode != PipelineSelection.NONE: - assert mode == PipelineSelection.ALL - - # Plan them out - track_elements = planner.plan(track_elements, ignore_cache=True) - - # Filter out --except elements - track_elements = self.remove_elements(track_elements) - - # Filter out cross junctioned elements - if self._track_cross_junctions: - self._assert_junction_tracking(track_elements) - else: - track_elements = self._filter_cross_junctions(track_elements) - - return track_elements + def subtract_elements(self, elements, subtract): + subtract_set = set(subtract) + return [ + e for e in elements + if e not in subtract_set + ] - # _prefilght() + # track_cross_junction_filter() # - # Preflights all the plugins in the pipeline + # Filters out elements which are across junction boundaries, + # otherwise asserts that there are no such elements. # - def _preflight(self): - for element in self.dependencies(Scope.ALL): - element._preflight() - - # _resolve_cache_keys() + # This is currently assumed to be only relevant for element + # lists targetted at tracking. # - # Initially resolve the cache keys + # Args: + # elements (list of Element): The list of elements to filter + # cross_junction_requested (bool): Whether the user requested + # cross junction tracking # - def _resolve_cache_keys(self): - track_elements = set(self._track_elements) - - with self.context.timed_activity("Resolving cached state", silent_nested=True): - for element in self.dependencies(Scope.ALL): - if element in track_elements: - # Load the pipeline in an explicitly inconsistent state, use - # this for pipelines with tracking queues enabled. - element._schedule_tracking() + # Returns: + # (list of Element): The filtered or asserted result + # + def track_cross_junction_filter(self, elements, cross_junction_requested): + # Filter out cross junctioned elements + if cross_junction_requested: + self._assert_junction_tracking(elements) + else: + elements = self._filter_cross_junctions(elements) - # Determine initial element state. This may resolve cache keys - # and interrogate the artifact cache. - element._update_state() + return elements - # _assert_consistent() + # assert_consistent() # - # Asserts that the pipeline is in a consistent state, that - # is to say that all sources are consistent and can at least - # be fetched. + # Asserts that the given list of elements are in a consistent state, that + # is to say that all sources are consistent and can at least be fetched. # # Consequently it also means that cache keys can be resolved. # - def _assert_consistent(self, toplevel): + def assert_consistent(self, elements): inconsistent = [] - with self.context.timed_activity("Checking sources"): - for element in toplevel: + with self._context.timed_activity("Checking sources"): + for element in elements: if element._get_consistency() == Consistency.INCONSISTENT: inconsistent.append(element) @@ -375,6 +371,21 @@ class Pipeline(): detail += " " + element._get_full_name() + "\n" raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline") + # cleanup() + # + # Cleans up resources used by the Pipeline. + # + def cleanup(self): + if self._loader: + self._loader.cleanup() + + # Reset the element loader state + Element._reset_load_state() + + ############################################################# + # Private Methods # + ############################################################# + # _filter_cross_junction() # # Filters out cross junction elements from the elements @@ -389,7 +400,7 @@ class Pipeline(): def _filter_cross_junctions(self, elements): return [ element for element in elements - if element._get_project() is self.project + if element._get_project() is self._project ] # _assert_junction_tracking() @@ -404,7 +415,7 @@ class Pipeline(): # We can track anything if the toplevel project uses project.refs # - if self.project.ref_storage == ProjectRefStorage.PROJECT_REFS: + if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS: return # Ideally, we would want to report every cross junction element but not @@ -414,37 +425,19 @@ class Pipeline(): # But this is too hard, lets shoot for a simple error. for element in elements: element_project = element._get_project() - if element_project is not self.project: + if element_project is not self._project: detail = "Requested to track sources across junction boundaries\n" + \ "in a project which does not use project.refs ref-storage." raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources") - # _plan() - # - # Args: - # except_ (bool): Whether to filter out the except elements from the plan - # - # Generator function to iterate over only the elements - # which are required to build the pipeline target, omitting - # cached elements. The elements are yielded in a depth sorted - # ordering for optimal build plans - def _plan(self, except_=True): - build_plan = _Planner().plan(self.targets) - - if except_: - build_plan = self.remove_elements(build_plan) - - for element in build_plan: - yield element - # _message() # # Local message propagator # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self.context.message( + self._context.message( Message(None, message_type, message, **args)) diff --git a/buildstream/_stream.py b/buildstream/_stream.py index 62d9f9804..c8d0bb69c 100644 --- a/buildstream/_stream.py +++ b/buildstream/_stream.py @@ -17,19 +17,22 @@ # # Authors: # Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> +# Jürg Billeter <juerg.billeter@codethink.co.uk> +# Tristan Maat <tristan.maat@codethink.co.uk> + import os import stat import shlex import shutil import tarfile +from contextlib import contextmanager from tempfile import TemporaryDirectory -from ._exceptions import StreamError, ImplError, BstError +from ._exceptions import StreamError, ImplError, BstError, set_last_task_error from ._message import Message, MessageType -from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue +from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue from ._pipeline import Pipeline, PipelineSelection from ._platform import Platform -from ._profile import Topics, profile_start, profile_end from . import utils, _yaml, _site from . import Scope, Consistency @@ -41,25 +44,46 @@ from . import Scope, Consistency # Args: # context (Context): The Context object # project (Project): The Project object -# loaded_callback (callable): A callback to invoke when the pipeline is loaded +# session_start (datetime): The time when the session started +# session_start_callback (callable): A callback to invoke when the session starts +# interrupt_callback (callable): A callback to invoke when we get interrupted +# ticker_callback (callable): Invoked every second while running the scheduler +# job_start_callback (callable): Called when a job starts +# job_complete_callback (callable): Called when a job completes # class Stream(): - def __init__(self, context, project, loaded_callback): - self.session_elements = 0 # Number of elements to process in this session - self.total_elements = 0 # Number of total potential elements for this pipeline - - self._context = context - self._project = project - self._scheduler = None - self._pipeline = None + def __init__(self, context, project, session_start, *, + session_start_callback=None, + interrupt_callback=None, + ticker_callback=None, + job_start_callback=None, + job_complete_callback=None): - self._loaded_cb = loaded_callback + # + # Public members + # + self.targets = [] # Resolved target elements + self.session_elements = [] # List of elements being processed this session + self.total_elements = [] # Total list of elements based on targets + self.queues = [] # Queue objects - # Load selected platform + # + # Private members + # Platform.create_instance(context, project) self._platform = Platform.get_platform() self._artifacts = self._platform.artifactcache + self._context = context + self._project = project + self._pipeline = Pipeline(context, project, self._artifacts) + self._scheduler = Scheduler(context, session_start, + interrupt_callback=interrupt_callback, + ticker_callback=ticker_callback, + job_start_callback=job_start_callback, + job_complete_callback=job_complete_callback) + self._first_non_track_queue = None + self._session_start_callback = session_start_callback # cleanup() # @@ -81,14 +105,18 @@ class Stream(): # except_targets (list of str): Specified targets to except from fetching # downloadable (bool): Whether the downloadable state of elements should be resolved # + # Returns: + # (list of Element): The selected elements def load_selection(self, targets, *, selection=PipelineSelection.NONE, except_targets=(), downloadable=False): - self.init_pipeline(targets, except_=except_targets, - use_configured_remote_caches=downloadable, - fetch_subprojects=False) - return self._pipeline.get_selection(selection) + elements, _ = self._load(targets, (), + selection=selection, + except_targets=except_targets, + use_artifact_config=downloadable, + fetch_subprojects=False) + return elements # shell() # @@ -118,7 +146,7 @@ class Stream(): if directory is None: missing_deps = [ dep._get_full_name() - for dep in self._pipeline.dependencies(scope) + for dep in self._pipeline.dependencies([element], scope) if not dep._cached() ] if missing_deps: @@ -145,66 +173,47 @@ class Stream(): track_cross_junctions=False, build_all=False): - rewritable = False - if track_targets: - rewritable = True + if build_all or track_targets: + selection = PipelineSelection.ALL + else: + selection = PipelineSelection.PLAN - self.init_pipeline(targets, - except_=track_except, - rewritable=rewritable, - use_configured_remote_caches=True, - track_elements=track_targets, - track_cross_junctions=track_cross_junctions, - fetch_subprojects=True) + elements, track_elements = \ + self._load(targets, track_targets, + selection=selection, track_selection=PipelineSelection.ALL, + track_except_targets=track_except, + track_cross_junctions=track_cross_junctions, + use_artifact_config=True, + fetch_subprojects=True) - if build_all: - plan = self._pipeline.dependencies(Scope.ALL) - else: - plan = self._pipeline._plan(except_=False) - - # We want to start the build queue with any elements that are - # not being tracked first - track_elements = set(self._pipeline._track_elements) - plan = [e for e in plan if e not in track_elements] - - # Assert that we have a consistent pipeline now (elements in - # track_plan will be made consistent) - self._pipeline._assert_consistent(plan) - - fetch = FetchQueue(self._scheduler, skip_cached=True) - build = BuildQueue(self._scheduler) - track = None - pull = None - push = None - queues = [] - if self._pipeline._track_elements: - track = TrackQueue(self._scheduler) - queues.append(track) - if self._pipeline._artifacts.has_fetch_remotes(): - pull = PullQueue(self._scheduler) - queues.append(pull) - queues.append(fetch) - queues.append(build) - if self._pipeline._artifacts.has_push_remotes(): - push = PushQueue(self._scheduler) - queues.append(push) - - # If we're going to track, tracking elements go into the first queue - # which is the tracking queue, the rest of the plan goes into the next - # queue (whatever that happens to be) - if track: - queues[0].enqueue(self._pipeline._track_elements) - queues[1].enqueue(plan) - else: - queues[0].enqueue(plan) + # Remove the tracking elements from the main targets + elements = self._pipeline.subtract_elements(elements, track_elements) - self.session_elements = len(self._pipeline._track_elements) + len(plan) + # Assert that the elements we're not going to track are consistent + self._pipeline.assert_consistent(elements) - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + # Now construct the queues + # + track_queue = None + if track_elements: + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + + if self._artifacts.has_fetch_remotes(): + self._add_queue(PullQueue(self._scheduler)) + + self._add_queue(FetchQueue(self._scheduler, skip_cached=True)) + self._add_queue(BuildQueue(self._scheduler)) + + if self._artifacts.has_push_remotes(): + self._add_queue(PushQueue(self._scheduler)) + + # Enqueue elements + # + if track_elements: + self._enqueue_plan(track_elements, queue=track_queue) + self._enqueue_plan(elements) + self._run() # fetch() # @@ -223,21 +232,25 @@ class Stream(): track_targets=False, track_cross_junctions=False): - rewritable = False if track_targets: - rewritable = True - - self.init_pipeline(targets, - except_=except_targets, - rewritable=rewritable, - track_elements=targets if track_targets else None, - track_cross_junctions=track_cross_junctions, - fetch_subprojects=True) + track_targets = targets + track_selection = selection + track_except_targets = except_targets + else: + track_targets = () + track_selection = PipelineSelection.NONE + track_except_targets = () - fetch_plan = self._pipeline.get_selection(selection) + elements, track_elements = \ + self._load(targets, track_targets, + selection=selection, track_selection=track_selection, + except_targets=except_targets, + track_except_targets=track_except_targets, + track_cross_junctions=track_cross_junctions, + fetch_subprojects=True) - # Delegated to a shared method for now - self._do_fetch(fetch_plan) + # Delegated to a shared fetch method + self._fetch(elements, track_elements=track_elements) # track() # @@ -255,26 +268,20 @@ class Stream(): def track(self, targets, *, selection=PipelineSelection.NONE, except_targets=None, - track_targets=False, cross_junctions=False): - self.init_pipeline(targets, - except_=except_targets, - rewritable=True, - track_elements=targets, - track_cross_junctions=cross_junctions, - track_selection=selection, - fetch_subprojects=True) + _, elements = \ + self._load(targets, targets, + selection=selection, track_selection=selection, + except_targets=except_targets, + track_except_targets=except_targets, + track_cross_junctions=cross_junctions, + fetch_subprojects=True) - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - self.session_elements = len(self._pipeline._track_elements) - - _, status = self._scheduler.run([track]) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + self._enqueue_plan(elements, queue=track_queue) + self._run() # pull() # @@ -292,33 +299,23 @@ class Stream(): selection=PipelineSelection.NONE, remote=None): - use_configured_remote_caches = True - if remote is not None: - use_configured_remote_caches = False + use_config = True + if remote: + use_config = False - self.init_pipeline(targets, - use_configured_remote_caches=use_configured_remote_caches, - add_remote_cache=remote, - fetch_subprojects=True) + elements, _ = self._load(targets, (), + selection=selection, + use_artifact_config=use_config, + artifact_remote_url=remote, + fetch_subprojects=True) - elements = self._pipeline.get_selection(selection) - - if not self._pipeline._artifacts.has_fetch_remotes(): + if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") - plan = elements - self._pipeline._assert_consistent(plan) - self._pipeline.session_elements = len(plan) - - pull = PullQueue(self._scheduler) - pull.enqueue(plan) - queues = [pull] - - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + self._pipeline.assert_consistent(elements) + self._add_queue(PullQueue(self._scheduler)) + self._enqueue_plan(elements) + self._run() # push() # @@ -336,33 +333,23 @@ class Stream(): selection=PipelineSelection.NONE, remote=None): - use_configured_remote_caches = True - if remote is not None: - use_configured_remote_caches = False - - self.init_pipeline(targets, - use_configured_remote_caches=use_configured_remote_caches, - add_remote_cache=remote, - fetch_subprojects=True) + use_config = True + if remote: + use_config = False - elements = self._pipeline.get_selection(selection) + elements, _ = self._load(targets, (), + selection=selection, + use_artifact_config=use_config, + artifact_remote_url=remote, + fetch_subprojects=True) - if not self._pipeline._artifacts.has_push_remotes(): + if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") - plan = elements - self._pipeline._assert_consistent(plan) - self._pipeline.session_elements = len(plan) - - push = PushQueue(self._scheduler) - push.enqueue(plan) - queues = [push] - - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + self._pipeline.assert_consistent(elements) + self._add_queue(PushQueue(self._scheduler)) + self._enqueue_plan(elements) + self._run() # checkout() # @@ -382,10 +369,9 @@ class Stream(): integrate=True, hardlinks=False): - self.init_pipeline((target,), fetch_subprojects=True) - # We only have one target in a checkout command - target = self._pipeline.targets[0] + elements, _ = self._load((target,), (), fetch_subprojects=True) + target = elements[0] try: os.makedirs(directory, exist_ok=True) @@ -433,13 +419,13 @@ class Stream(): track_first, force): - self.init_pipeline((target,), - track_elements=[target] if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=False) + if track_first: + track_targets = (target,) + else: + track_targets = () - target = self._pipeline.targets[0] + elements, track_elements = self._load((target,), track_targets) + target = elements[0] workdir = os.path.abspath(directory) if not list(target.sources()): @@ -459,11 +445,11 @@ class Stream(): # If we're going to checkout, we need at least a fetch, # if we were asked to track first, we're going to fetch anyway. # - # For now, tracking is handled by _do_fetch() automatically - # by virtue of our screwed up pipeline initialization stuff. - # if not no_checkout or track_first: - self._do_fetch([target]) + track_elements = [] + if track_first: + track_elements = elements + self._fetch(elements, track_elements=track_elements) if not no_checkout and target._get_consistency() != Consistency.CACHED: raise StreamError("Could not stage uncached source. " + @@ -522,34 +508,35 @@ class Stream(): # def workspace_reset(self, targets, *, track_first): - self.init_pipeline(targets, - track_elements=targets if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=False) + if track_first: + track_targets = targets + else: + track_targets = () + + elements, track_elements = self._load(targets, track_targets) # Do the tracking first if track_first: - self._do_fetch(self._pipeline.targets) + self._fetch(elements, track_elements=track_elements) - for target in self._pipeline.targets: - workspace = self._project.workspaces.get_workspace(target.name) + for element in elements: + workspace = self._project.workspaces.get_workspace(element.name) - with target.timed_activity("Removing workspace directory {}" - .format(workspace.path)): + with element.timed_activity("Removing workspace directory {}" + .format(workspace.path)): try: shutil.rmtree(workspace.path) except OSError as e: raise StreamError("Could not remove '{}': {}" .format(workspace.path, e)) from e - self._project.workspaces.delete_workspace(target.name) - self._project.workspaces.create_workspace(target.name, workspace.path) + self._project.workspaces.delete_workspace(element.name) + self._project.workspaces.create_workspace(element.name, workspace.path) - with target.timed_activity("Staging sources to {}".format(workspace.path)): - target._open_workspace() + with element.timed_activity("Staging sources to {}".format(workspace.path)): + element._open_workspace() - self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path)) + self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path)) self._project.workspaces.save_config() @@ -609,15 +596,20 @@ class Stream(): force=False, compression="gz"): - self.init_pipeline((target,), - track_elements=[target] if track_first else None, - track_selection=PipelineSelection.NONE, - rewritable=track_first, - fetch_subprojects=True) + if track_first: + track_targets = (target,) + else: + track_targets = () + + elements, track_elements = self._load((target,), track_targets, + selection=PipelineSelection.ALL, + track_selection=PipelineSelection.ALL, + fetch_subprojects=True) # source-bundle only supports one target - target = self._pipeline.targets[0] - dependencies = self._pipeline.get_selection(PipelineSelection.ALL) + target = self.targets[0] + + self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name)) # Find the correct filename for the compression algorithm tar_location = os.path.join(directory, target.normal_name + ".tar") @@ -635,14 +627,15 @@ class Stream(): raise StreamError("Cannot write to {0}: {1}" .format(tar_location, e)) from e - plan = list(dependencies) - self._do_fetch(plan) + # Fetch and possibly track first + # + self._fetch(elements, track_elements=track_elements) # We don't use the scheduler for this as it is almost entirely IO # bound. # Create a temporary directory to build the source tree in - builddir = target._get_context().builddir + builddir = self._context.builddir prefix = "{}-".format(target.normal_name) with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir: @@ -655,18 +648,162 @@ class Stream(): # Any elements that don't implement _write_script # should not be included in the later stages. - plan = [element for element in plan - if self._write_element_script(source_directory, element)] + elements = [ + element for element in elements + if self._write_element_script(source_directory, element) + ] - self._write_element_sources(tempdir, plan) - self._write_build_script(tempdir, plan) + self._write_element_sources(tempdir, elements) + self._write_build_script(tempdir, elements) self._collect_sources(tempdir, tar_location, target.normal_name, compression) ############################################################# - # Private Methods # + # Scheduler API forwarding # + ############################################################# + + # running + # + # Whether the scheduler is running + # + @property + def running(self): + return self._scheduler.loop is not None + + # suspended + # + # Whether the scheduler is currently suspended + # + @property + def suspended(self): + return self._scheduler.suspended + + # terminated + # + # Whether the scheduler is currently terminated + # + @property + def terminated(self): + return self._scheduler.terminated + + # elapsed_time + # + # Elapsed time since the session start + # + @property + def elapsed_time(self): + return self._scheduler.elapsed_time() + + # terminate() + # + # Terminate jobs + # + def terminate(self): + self._scheduler.terminate_jobs() + + # quit() + # + # Quit the session, this will continue with any ongoing + # jobs, use Stream.terminate() instead for cancellation + # of ongoing jobs + # + def quit(self): + self._scheduler.stop_queueing() + + # suspend() + # + # Context manager to suspend ongoing jobs + # + @contextmanager + def suspend(self): + with self._scheduler.jobs_suspended(): + yield + + ############################################################# + # Private Methods # ############################################################# + # _load() + # + # A convenience method for loading element lists + # + # Args: + # targets (list of str): Main targets to load + # track_targets (list of str): Tracking targets + # selection (PipelineSelection): The selection mode for the specified targets + # track_selection (PipelineSelection): The selection mode for the specified tracking targets + # except_targets (list of str): Specified targets to except from fetching + # track_except_targets (list of str): Specified targets to except from fetching + # track_cross_junctions (bool): Whether tracking should cross junction boundaries + # use_artifact_config (bool): Whether to initialize artifacts with the config + # artifact_remote_url (bool): A remote url for initializing the artifacts + # fetch_subprojects (bool): Whether to fetch subprojects while loading + # + # Returns: + # (list of Element): The primary element selection + # (list of Element): The tracking element selection + # + def _load(self, targets, track_targets, *, + selection=PipelineSelection.NONE, + track_selection=PipelineSelection.NONE, + except_targets=(), + track_except_targets=(), + track_cross_junctions=False, + use_artifact_config=False, + artifact_remote_url=None, + fetch_subprojects=False): + + # Load rewritable if we have any tracking selection to make + rewritable = False + if track_targets: + rewritable = True + + # Load all targets + elements, except_elements, track_elements, track_except_elements = \ + self._pipeline.load([targets, except_targets, track_targets, track_except_targets], + rewritable=rewritable, + fetch_subprojects=fetch_subprojects) + + # Hold on to the targets + self.targets = elements + + # Here we should raise an error if the track_elements targets + # are not dependencies of the primary targets, this is not + # supported. + # + # This can happen with `bst build --track` + # + if not self._pipeline.targets_include(elements, track_elements): + raise StreamError("Specified tracking targets that are not " + "within the scope of primary targets") + + # First take care of marking tracking elements, this must be + # done before resolving element states. + # + assert track_selection != PipelineSelection.PLAN + track_selected = self._pipeline.get_selection(track_elements, track_selection) + track_selected = self._pipeline.except_elements(track_elements, + track_selected, + track_except_elements) + track_selected = self._pipeline.track_cross_junction_filter(track_selected, + track_cross_junctions) + + for element in track_selected: + element._schedule_tracking() + + # Connect to remote caches, this needs to be done before resolving element state + self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url) + + # Now move on to loading primary selection. + # + self._pipeline.resolve_elements(elements) + selected = self._pipeline.get_selection(elements, selection) + selected = self._pipeline.except_elements(elements, + selected, + except_elements) + + return selected, track_selected + # _message() # # Local message propagator @@ -676,47 +813,103 @@ class Stream(): self._context.message( Message(None, message_type, message, **args)) - # _do_fetch() + # _add_queue() + # + # Adds a queue to the stream + # + # Args: + # queue (Queue): Queue to add to the pipeline + # track (bool): Whether this is the tracking queue + # + def _add_queue(self, queue, *, track=False): + self.queues.append(queue) + + if not (track or self._first_non_track_queue): + self._first_non_track_queue = queue + + # _enqueue_plan() + # + # Enqueues planned elements to the specified queue. + # + # Args: + # plan (list of Element): The list of elements to be enqueued + # queue (Queue): The target queue, defaults to the first non-track queue + # + def _enqueue_plan(self, plan, *, queue=None): + queue = queue or self._first_non_track_queue + + queue.enqueue(plan) + self.session_elements += plan + + # _run() + # + # Common function for running the scheduler + # + def _run(self): + + # Inform the frontend of the full list of elements + # and the list of elements which will be processed in this run + # + self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL)) + + if self._session_start_callback is not None: + self._session_start_callback() + + _, status = self._scheduler.run(self.queues) + + # Force update element states after a run, such that the summary + # is more coherent + try: + for element in self.total_elements: + element._update_state() + except BstError as e: + self._message(MessageType.ERROR, "Error resolving final state", detail=str(e)) + set_last_task_error(e.domain, e.reason) + except Exception as e: # pylint: disable=broad-except + self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e)) + + if status == SchedStatus.ERROR: + raise StreamError() + elif status == SchedStatus.TERMINATED: + raise StreamError(terminated=True) + + # _fetch() # # Performs the fetch job, the body of this function is here because # it is shared between a few internals. # # Args: # elements (list of Element): Elements to fetch + # track_elements (list of Element): Elements to track # - def _do_fetch(self, elements): + def _fetch(self, elements, *, track_elements=None): - fetch_plan = elements + if track_elements is None: + track_elements = [] # Subtract the track elements from the fetch elements, they will be added separately - if self._pipeline._track_elements: - track_elements = set(self._pipeline._track_elements) - fetch_plan = [e for e in fetch_plan if e not in track_elements] + fetch_plan = self._pipeline.subtract_elements(elements, track_elements) # Assert consistency for the fetch elements - self._pipeline._assert_consistent(fetch_plan) + self._pipeline.assert_consistent(fetch_plan) # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED] - fetch_plan = [elt for elt in fetch_plan if elt not in cached] - - self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan) + fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) - fetch = FetchQueue(self._scheduler) - fetch.enqueue(fetch_plan) - if self._pipeline._track_elements: - track = TrackQueue(self._scheduler) - track.enqueue(self._pipeline._track_elements) - queues = [track, fetch] - else: - queues = [fetch] + # Construct queues, enqueue and run + # + track_queue = None + if track_elements: + track_queue = TrackQueue(self._scheduler) + self._add_queue(track_queue, track=True) + self._add_queue(FetchQueue(self._scheduler)) - _, status = self._scheduler.run(queues) - if status == SchedStatus.ERROR: - raise StreamError() - elif status == SchedStatus.TERMINATED: - raise StreamError(terminated=True) + if track_elements: + self._enqueue_plan(track_elements, queue=track_queue) + self._enqueue_plan(fetch_plan) + self._run() # Helper function for checkout() # @@ -772,7 +965,7 @@ class Stream(): # Collect the sources in the given sandbox into a tarfile def _collect_sources(self, directory, tar_name, element_name, compression): - with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)): + with self._context.timed_activity("Creating tarball {}".format(tar_name)): if compression == "none": permissions = "w:" else: @@ -780,64 +973,3 @@ class Stream(): with tarfile.open(tar_name, permissions) as tar: tar.add(directory, arcname=element_name) - - ############################################################# - # TEMPORARY CRAP # - ############################################################# - - # init_pipeline() - # - # Initialize the pipeline for a given activity - # - # Args: - # elements (list of elements): The elements to load recursively - # except_ (list of elements): The elements to except - # rewritable (bool): Whether we should load the YAML files for roundtripping - # use_configured_remote_caches (bool): Whether we should contact remotes - # add_remote_cache (str): The URL for an explicitly mentioned remote cache - # track_elements (list of elements): Elements which are to be tracked - # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries - # track_selection (PipelineSelection): The selection algorithm for track elements - # fetch_subprojects (bool): Whether to fetch subprojects while loading - # - # Note that the except_ argument may have a subtly different meaning depending - # on the activity performed on the Pipeline. In normal circumstances the except_ - # argument excludes elements from the `elements` list. In a build session, the - # except_ elements are excluded from the tracking plan. - # - def init_pipeline(self, elements, *, - except_=tuple(), - rewritable=False, - use_configured_remote_caches=False, - add_remote_cache=None, - track_elements=None, - track_cross_junctions=False, - track_selection=PipelineSelection.ALL, - fetch_subprojects=True): - - profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - - self._pipeline = Pipeline(self._context, self._project, self._artifacts, - elements, except_, - rewritable=rewritable, - fetch_subprojects=fetch_subprojects) - - # After loading the projects, but before resolving cache keys, - # we need to initialize remote artifact caches where relevant - # - self._artifacts.setup_remotes(use_config=use_configured_remote_caches, - remote_url=add_remote_cache) - - # Now complete the initialization - # - self._pipeline.initialize(track_elements=track_elements, - track_cross_junctions=track_cross_junctions, - track_selection=track_selection) - - profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements)) - - # Get the total - self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL))) - - if self._loaded_cb is not None: - self._loaded_cb(self._pipeline) diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py index 84d543e52..3f0a3adbe 100644 --- a/tests/frontend/buildtrack.py +++ b/tests/frontend/buildtrack.py @@ -31,40 +31,24 @@ def create_element(repo, name, path, dependencies, ref=None): @pytest.mark.datafiles(os.path.join(DATA_DIR)) @pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')]) -@pytest.mark.parametrize("exceptions,excepted", [ +@pytest.mark.parametrize("track_targets,exceptions,tracked", [ # Test with no exceptions - ([], []), + (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']), + (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']), + (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']), # Test excepting '2.bst' - (['2.bst'], ['2.bst', '7.bst']), + (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']), + (['3.bst'], ['2.bst'], []), + (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']), # Test excepting '2.bst' and '3.bst' - (['2.bst', '3.bst'], [ - '2.bst', '3.bst', '4.bst', - '5.bst', '6.bst', '7.bst' - ]) + (['0.bst'], ['2.bst', '3.bst'], ['0.bst']), + (['3.bst'], ['2.bst', '3.bst'], []), + (['2.bst', '3.bst'], ['2.bst', '3.bst'], []) ]) -@pytest.mark.parametrize("track_targets,tracked", [ - # Test tracking the main target element - (['0.bst'], [ - '0.bst', '2.bst', '3.bst', - '4.bst', '5.bst', '6.bst', '7.bst' - ]), - - # Test tracking a child element - (['3.bst'], [ - '3.bst', '4.bst', '5.bst', - '6.bst' - ]), - - # Test tracking multiple children - (['2.bst', '3.bst'], [ - '2.bst', '3.bst', '4.bst', - '5.bst', '6.bst', '7.bst' - ]) -]) -def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets, - exceptions, tracked, excepted): +def test_build_track(cli, datafiles, tmpdir, ref_storage, + track_targets, exceptions, tracked): project = os.path.join(datafiles.dirname, datafiles.basename) dev_files_path = os.path.join(project, 'files', 'dev-files') element_path = os.path.join(project, 'elements') @@ -102,7 +86,7 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets, for element, dependencies in create_elements.items(): # Test the element inconsistency resolution by ensuring that # only elements that aren't tracked have refs - if element in set(tracked) - set(excepted): + if element in set(tracked): # Elements which should not have a ref set # create_element(repo, element, element_path, dependencies) @@ -133,14 +117,14 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets, result = cli.run(project=project, silent=True, args=args) tracked_elements = result.get_tracked_elements() - assert set(tracked_elements) == set(tracked) - set(excepted) + assert set(tracked_elements) == set(tracked) # Delete element sources source_dir = os.path.join(project, 'cache', 'sources') shutil.rmtree(source_dir) # Delete artifacts one by one and assert element states - for target in set(tracked) - set(excepted): + for target in set(tracked): cli.remove_artifact_from_cache(project, target) # Assert that it's tracked @@ -154,40 +138,24 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets, @pytest.mark.datafiles(os.path.join(DATA_DIR)) -@pytest.mark.parametrize("exceptions,excepted", [ +@pytest.mark.parametrize("track_targets,exceptions,tracked", [ # Test with no exceptions - ([], []), + (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']), + (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']), + (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']), # Test excepting '2.bst' - (['2.bst'], ['2.bst', '7.bst']), + (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']), + (['3.bst'], ['2.bst'], []), + (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']), # Test excepting '2.bst' and '3.bst' - (['2.bst', '3.bst'], [ - '2.bst', '3.bst', '4.bst', - '5.bst', '6.bst', '7.bst' - ]) -]) -@pytest.mark.parametrize("track_targets,tracked", [ - # Test tracking the main target element - (['0.bst'], [ - '0.bst', '2.bst', '3.bst', - '4.bst', '5.bst', '6.bst', '7.bst' - ]), - - # Test tracking a child element - (['3.bst'], [ - '3.bst', '4.bst', '5.bst', - '6.bst' - ]), - - # Test tracking multiple children - (['2.bst', '3.bst'], [ - '2.bst', '3.bst', '4.bst', - '5.bst', '6.bst', '7.bst' - ]) + (['0.bst'], ['2.bst', '3.bst'], ['0.bst']), + (['3.bst'], ['2.bst', '3.bst'], []), + (['2.bst', '3.bst'], ['2.bst', '3.bst'], []) ]) def test_build_track_update(cli, datafiles, tmpdir, track_targets, - exceptions, tracked, excepted): + exceptions, tracked): project = os.path.join(datafiles.dirname, datafiles.basename) dev_files_path = os.path.join(project, 'files', 'dev-files') element_path = os.path.join(project, 'elements') @@ -231,7 +199,7 @@ def test_build_track_update(cli, datafiles, tmpdir, track_targets, result = cli.run(project=project, silent=True, args=args) tracked_elements = result.get_tracked_elements() - assert set(tracked_elements) == set(tracked) - set(excepted) + assert set(tracked_elements) == set(tracked) @pytest.mark.datafiles(os.path.join(DATA_DIR)) diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py index db683094b..65929cf50 100644 --- a/tests/plugins/pipeline.py +++ b/tests/plugins/pipeline.py @@ -23,23 +23,25 @@ def create_pipeline(tmpdir, basedir, target): context.set_message_handler(dummy_handler) - return Pipeline(context, project, None, [target], []) + pipeline = Pipeline(context, project, None) + targets, = pipeline.load([(target,)]) + return targets @pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource')) def test_customsource(datafiles, tmpdir): basedir = os.path.join(datafiles.dirname, datafiles.basename) - pipeline = create_pipeline(tmpdir, basedir, 'simple.bst') - assert(pipeline.targets[0].get_kind() == "autotools") + targets = create_pipeline(tmpdir, basedir, 'simple.bst') + assert(targets[0].get_kind() == "autotools") @pytest.mark.datafiles(os.path.join(DATA_DIR, 'customelement')) def test_customelement(datafiles, tmpdir): basedir = os.path.join(datafiles.dirname, datafiles.basename) - pipeline = create_pipeline(tmpdir, basedir, 'simple.bst') - assert(pipeline.targets[0].get_kind() == "foo") + targets = create_pipeline(tmpdir, basedir, 'simple.bst') + assert(targets[0].get_kind() == "foo") @pytest.mark.datafiles(os.path.join(DATA_DIR, 'badversionsource')) @@ -47,7 +49,7 @@ def test_badversionsource(datafiles, tmpdir): basedir = os.path.join(datafiles.dirname, datafiles.basename) with pytest.raises(LoadError) as exc: - pipeline = create_pipeline(tmpdir, basedir, 'simple.bst') + targets = create_pipeline(tmpdir, basedir, 'simple.bst') assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN @@ -57,6 +59,6 @@ def test_badversionelement(datafiles, tmpdir): basedir = os.path.join(datafiles.dirname, datafiles.basename) with pytest.raises(LoadError) as exc: - pipeline = create_pipeline(tmpdir, basedir, 'simple.bst') + targets = create_pipeline(tmpdir, basedir, 'simple.bst') assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN |