summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_frontend/app.py70
-rw-r--r--buildstream/_frontend/status.py25
-rw-r--r--buildstream/_frontend/widget.py37
-rw-r--r--buildstream/_pipeline.py369
-rw-r--r--buildstream/_stream.py688
-rw-r--r--tests/frontend/buildtrack.py86
-rw-r--r--tests/plugins/pipeline.py16
7 files changed, 685 insertions, 606 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 58a8eab89..de23c12b0 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,7 +39,6 @@ from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
-from .._scheduler import Scheduler
from .._versions import BST_FORMAT_VERSION
from .. import __version__ as build_stream_version
from .. import _yaml
@@ -69,7 +68,6 @@ class App():
self.context = None # The Context object
self.stream = None # The Stream object
self.project = None # The toplevel Project object
- self.scheduler = None # The Scheduler
self.logger = None # The LogLine object
self.interactive = None # Whether we are running in interactive mode
self.colors = None # Whether to use colors in logging
@@ -83,6 +81,7 @@ class App():
self._status = None # The Status object
self._fail_messages = {} # Failure messages by unique plugin id
self._interactive_failures = None # Whether to handle failures interactively
+ self._started = False # Whether a session has started
# UI Colors Profiles
self._content_profile = Profile(fg='yellow')
@@ -202,14 +201,12 @@ class App():
self._error_exit(e, "Error loading project")
# Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context, self.project, self.loaded_cb)
-
- # Create the application's scheduler
- self.scheduler = Scheduler(self.context, self._session_start,
- interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick,
- job_start_callback=self._job_started,
- job_complete_callback=self._job_completed)
+ self.stream = Stream(self.context, self.project, self._session_start,
+ session_start_callback=self.session_start_cb,
+ interrupt_callback=self._interrupt_handler,
+ ticker_callback=self._tick,
+ job_start_callback=self._job_started,
+ job_complete_callback=self._job_completed)
# Create the logger right before setting the message handler
self.logger = LogLine(self.context,
@@ -224,8 +221,7 @@ class App():
self._status = Status(self.context,
self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.stream, self.scheduler,
- colors=self.colors)
+ self.stream, colors=self.colors)
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
@@ -238,10 +234,6 @@ class App():
if session_name:
self._message(MessageType.START, session_name)
- # XXX This is going to change soon !
- #
- self.stream._scheduler = self.scheduler
-
# Run the body of the session here, once everything is loaded
try:
yield
@@ -249,14 +241,15 @@ class App():
# Print a nice summary if this is a session
if session_name:
- elapsed = self.scheduler.elapsed_time()
+ elapsed = self.stream.elapsed_time
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
else:
self._message(MessageType.FAIL, session_name, elapsed=elapsed)
- self._print_summary()
+ if self._started:
+ self._print_summary()
# Exit with the error
self._error_exit(e)
@@ -264,8 +257,9 @@ class App():
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.scheduler.elapsed_time())
- self._print_summary()
+ self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ if self._started:
+ self._print_summary()
# init_project()
#
@@ -400,8 +394,8 @@ class App():
# If the scheduler has started, try to terminate all jobs gracefully,
# otherwise exit immediately.
- if self.scheduler.loop:
- self.scheduler.terminate_jobs()
+ if self.stream.running:
+ self.stream.terminate()
else:
sys.exit(-1)
@@ -411,8 +405,8 @@ class App():
def _maybe_render_status(self):
# If we're suspended or terminating, then dont render the status area
- if self._status and self.scheduler and \
- not (self.scheduler.suspended or self.scheduler.terminated):
+ if self._status and self.stream and \
+ not (self.stream.suspended or self.stream.terminated):
self._status.render()
#
@@ -423,7 +417,7 @@ class App():
# Only handle ^C interactively in interactive mode
if not self.interactive:
self._status.clear()
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
return
# Here we can give the user some choices, like whether they would
@@ -452,11 +446,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs at user request\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing\n", err=True)
@@ -473,7 +467,7 @@ class App():
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if not success and not self.scheduler.terminated:
+ if not success and not self.stream.terminated:
# Get the last failure message for additional context
failure = self._fail_messages.get(element._get_unique_id())
@@ -494,9 +488,9 @@ class App():
if not self._interactive_failures:
if self.context.sched_error_action == 'terminate':
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
elif self.context.sched_error_action == 'quit':
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif self.context.sched_error_action == 'continue':
pass
return
@@ -551,11 +545,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
@@ -567,10 +561,12 @@ class App():
# Print the session heading if we've loaded a pipeline and there
# is going to be a session
#
- def loaded_cb(self, pipeline):
+ def session_start_cb(self):
+ self._started = True
if self._session_name:
- self.logger.print_heading(pipeline,
- self._main_options['log_file'],
+ self.logger.print_heading(self.project,
+ self.stream,
+ log_file=self._main_options['log_file'],
styling=self.colors)
#
@@ -578,7 +574,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.stream, self.scheduler,
+ self.logger.print_summary(self.stream,
self._main_options['log_file'],
styling=self.colors)
@@ -642,7 +638,7 @@ class App():
def _interrupted(self):
self._status.clear()
try:
- with self.scheduler.jobs_suspended():
+ with self.stream.suspend():
yield
finally:
self._maybe_render_status()
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 13e00f58f..0e5855181 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -39,7 +39,6 @@ from .widget import TimeCode
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
-# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
#
class Status():
@@ -47,14 +46,14 @@ class Status():
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler, colors=False):
+ stream, colors=False):
self._context = context
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
- self._scheduler = scheduler
+ self._stream = stream
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
self._term = Terminal()
@@ -63,7 +62,7 @@ class Status():
self._header = _StatusHeader(context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler)
+ stream)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
@@ -80,7 +79,7 @@ class Status():
# action_name (str): The action name for this job
#
def add_job(self, element, action_name):
- elapsed = self._scheduler.elapsed_time()
+ elapsed = self._stream.elapsed_time
job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
@@ -136,7 +135,7 @@ class Status():
if not self._term.does_styling:
return
- elapsed = self._scheduler.elapsed_time()
+ elapsed = self._stream.elapsed_time
self.clear()
self._check_term_width()
@@ -251,14 +250,13 @@ class Status():
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
-# scheduler (Scheduler): The Scheduler
#
class _StatusHeader():
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler):
+ stream):
#
# Public members
@@ -273,7 +271,6 @@ class _StatusHeader():
self._success_profile = success_profile
self._error_profile = error_profile
self._stream = stream
- self._scheduler = scheduler
self._time_code = TimeCode(context, content_profile, format_profile)
self._context = context
@@ -283,8 +280,8 @@ class _StatusHeader():
size = 0
text = ''
- session = str(self._stream.session_elements)
- total = str(self._stream.total_elements)
+ session = str(len(self._stream.session_elements))
+ total = str(len(self._stream.total_elements))
# Format and calculate size for target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
@@ -303,10 +300,10 @@ class _StatusHeader():
text = ''
# Format and calculate size for each queue progress
- for queue in self._scheduler.queues:
+ for queue in self._stream.queues:
# Add spacing
- if self._scheduler.queues.index(queue) > 0:
+ if self._stream.queues.index(queue) > 0:
size += 2
text += self._format_profile.fmt('→ ')
@@ -366,7 +363,7 @@ class _StatusHeader():
# action_name (str): The name of the action
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
-# elapsed (datetime): The offset of the scheduler when this job is created
+# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index 814f87ff5..5b405682a 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -28,7 +28,7 @@ import click
from ruamel import yaml
from . import Profile
-from .. import Element, Scope, Consistency
+from .. import Element, Consistency
from .. import _yaml
from .. import __version__ as bst_version
from .._exceptions import ImplError
@@ -435,23 +435,17 @@ class LogLine(Widget):
# and so on.
#
# Args:
- # pipeline (Pipeline): The pipeline to print the heading of
+ # project (Project): The toplevel project we were invoked from
+ # stream (Stream): The stream
# log_file (file): An optional file handle for additional logging
- # deps (list): Optional list of elements, default is to use the whole pipeline
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_heading(self, pipeline, log_file, deps=None, styling=False):
+ def print_heading(self, project, stream, *, log_file, styling=False):
context = self.context
- project = pipeline.project
starttime = datetime.datetime.now()
text = ''
- assert self._resolved_keys is None
- elements = set()
- visited = {}
- for element in pipeline.targets:
- elements.update(element.dependencies(Scope.ALL, visited=visited))
- self._resolved_keys = {element: element._get_cache_key() for element in elements}
+ self._resolved_keys = {element: element._get_cache_key() for element in stream.session_elements}
# Main invocation context
text += '\n'
@@ -459,7 +453,7 @@ class LogLine(Widget):
values = OrderedDict()
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
- values["Targets"] = ", ".join([t.name for t in pipeline.targets])
+ values["Targets"] = ", ".join([t.name for t in stream.targets])
text += self._format_values(values)
# User configurations
@@ -494,9 +488,7 @@ class LogLine(Widget):
# Pipeline state
text += self.content_profile.fmt("Pipeline\n", bold=True)
- if deps is None:
- deps = pipeline.dependencies(Scope.ALL)
- text += self.show_pipeline(deps, context.log_element_format)
+ text += self.show_pipeline(stream.total_elements, context.log_element_format)
text += '\n'
# Separator line before following output
@@ -512,16 +504,15 @@ class LogLine(Widget):
#
# Args:
# stream (Stream): The Stream
- # scheduler (Scheduler): The Scheduler
# log_file (file): An optional file handle for additional logging
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_summary(self, stream, scheduler, log_file, styling=False):
+ def print_summary(self, stream, log_file, styling=False):
# Early silent return if there are no queues, can happen
- # only in the case that the pipeline early returned due to
+ # only in the case that the stream early returned due to
# an inconsistent pipeline state.
- if scheduler.queues is None:
+ if not stream.queues:
return
text = ''
@@ -544,18 +535,18 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(stream.total_elements))
- values['Session'] = self.content_profile.fmt(str(stream.session_elements))
+ values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
+ values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
processed_maxlen = 1
skipped_maxlen = 1
failed_maxlen = 1
- for queue in scheduler.queues:
+ for queue in stream.queues:
processed_maxlen = max(len(str(len(queue.processed_elements))), processed_maxlen)
skipped_maxlen = max(len(str(len(queue.skipped_elements))), skipped_maxlen)
failed_maxlen = max(len(str(len(queue.failed_elements))), failed_maxlen)
- for queue in scheduler.queues:
+ for queue in stream.queues:
processed = str(len(queue.processed_elements))
skipped = str(len(queue.skipped_elements))
failed = str(len(queue.failed_elements))
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 5327d3fb6..8861556c9 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -18,13 +18,16 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+import os
import itertools
from operator import itemgetter
from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._loader import Loader
+from ._profile import Topics, profile_start, profile_end
from .element import Element
from . import Scope, Consistency
from ._project import ProjectRefStorage
@@ -60,60 +63,59 @@ class PipelineSelection():
# Pipeline()
#
# Args:
-# context (Context): The Context object
# project (Project): The Project object
-# target (str): A bst filename relative to the project directory
-# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state,
-# this is appropriate when source tracking will run and the
-# current source refs will not be the effective refs.
-# rewritable (bool): Whether the loaded files should be rewritable
-# this is a bit more expensive due to deep copies
-# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
-# loading process, if they are not yet locally cached
-#
-# The ticker methods will be called with an element name for each tick, a final
-# tick with None as the argument is passed to signal that processing of this
-# stage has terminated.
-#
-# Raises:
-# LoadError
-# PluginError
-# SourceError
-# ElementError
-# ProgramNotFoundError
+# context (Context): The Context object
+# artifacts (Context): The ArtifactCache object
#
class Pipeline():
- def __init__(self, context, project, artifacts, targets, except_, *,
- rewritable=False,
- fetch_subprojects=True):
+ def __init__(self, context, project, artifacts):
- self.context = context # The Context
- self.project = project # The toplevel project
- self.targets = [] # List of toplevel target Element objects
+ self._context = context # The Context
+ self._project = project # The toplevel project
#
# Private members
#
self._artifacts = artifacts
self._loader = None
- self._exceptions = None
- self._track_cross_junctions = False
- self._track_elements = []
- #
- # Early initialization
- #
+ # load()
+ #
+ # Loads elements from target names.
+ #
+ # This function is called with a list of lists, such that multiple
+ # target groups may be specified. Element names specified in `targets`
+ # are allowed to be redundant.
+ #
+ # Args:
+ # target_groups (list of lists): Groups of toplevel targets to load
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ # rewritable (bool): Whether the loaded files should be rewritable
+ # this is a bit more expensive due to deep copies
+ #
+ # Returns:
+ # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups
+ #
+ def load(self, target_groups, *,
+ fetch_subprojects=True,
+ rewritable=False):
+
+ # First concatenate all the lists for the loader's sake
+ targets = list(itertools.chain(*target_groups))
- self._loader = Loader(self.context, self.project, targets + except_,
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
+
+ self._loader = Loader(self._context, self._project, targets,
fetch_subprojects=fetch_subprojects)
- with self.context.timed_activity("Loading pipeline", silent_nested=True):
+ with self._context.timed_activity("Loading pipeline", silent_nested=True):
meta_elements = self._loader.load(rewritable, None)
# Resolve the real elements now that we've loaded the project
- with self.context.timed_activity("Resolving pipeline"):
- resolved_elements = [
+ with self._context.timed_activity("Resolving pipeline"):
+ elements = [
Element._new_from_meta(meta, self._artifacts)
for meta in meta_elements
]
@@ -130,61 +132,89 @@ class Pipeline():
detail += "\n".join(lines)
self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
- self.targets = resolved_elements[:len(targets)]
- self._exceptions = resolved_elements[len(targets):]
+ # Now create element groups to match the input target groups
+ elt_iter = iter(elements)
+ element_groups = [
+ [next(elt_iter) for i in range(len(group))]
+ for group in target_groups
+ ]
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
+
+ return tuple(element_groups)
- # initialize()
+ # resolve_elements()
#
- # Initialize the pipeline
+ # Resolve element state and cache keys.
#
# Args:
- # track_element (list of Elements): List of elements specified by the frontend for tracking
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
+ # targets (list of Element): The list of toplevel element targets
#
- def initialize(self,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL):
+ def resolve_elements(self, targets):
+ with self._context.timed_activity("Resolving cached state", silent_nested=True):
+ for element in self.dependencies(targets, Scope.ALL):
- # Preflight directly, before ever interrogating caches or anything.
- self._preflight()
+ # Preflight
+ element._preflight()
- # Work out what we're going track, if anything
- self._track_cross_junctions = track_cross_junctions
- if track_elements:
- self._track_elements = self._get_elements_to_track(track_elements, track_selection)
-
- # Now resolve the cache keys once tracking elements have been resolved
- self._resolve_cache_keys()
+ # Determine initial element state.
+ element._update_state()
- # cleanup()
+ # dependencies()
#
- # Cleans up resources used by the Pipeline.
+ # Generator function to iterate over elements and optionally
+ # also iterate over sources.
#
- def cleanup(self):
- if self._loader:
- self._loader.cleanup()
+ # Args:
+ # targets (list of Element): The target Elements to loop over
+ # scope (Scope): The scope to iterate over
+ # recurse (bool): Whether to recurse into dependencies
+ #
+ def dependencies(self, targets, scope, *, recurse=True):
+ # Keep track of 'visited' in this scope, so that all targets
+ # share the same context.
+ visited = {}
- # Reset the element loader state
- Element._reset_load_state()
+ for target in targets:
+ for element in target.dependencies(scope, recurse=recurse, visited=visited):
+ yield element
+
+ # plan()
+ #
+ # Generator function to iterate over only the elements
+ # which are required to build the pipeline target, omitting
+ # cached elements. The elements are yielded in a depth sorted
+ # ordering for optimal build plans
+ #
+ # Args:
+ # elements (list of Element): List of target elements to plan
+ #
+ # Returns:
+ # (list of Element): A depth sorted list of the build plan
+ #
+ def plan(self, elements):
+ return _Planner().plan(elements)
# get_selection()
#
+ # Gets a full list of elements based on a toplevel
+ # list of element targets
+ #
# Args:
+ # targets (list of Element): The target Elements
# mode (PipelineSelection): The PipelineSelection mode
#
# Various commands define a --deps option to specify what elements to
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
- def get_selection(self, mode):
+ def get_selection(self, targets, mode):
elements = None
if mode == PipelineSelection.NONE:
- elements = self.targets
+ elements = targets
elif mode == PipelineSelection.PLAN:
- elements = list(self._plan())
+ elements = self.plan(targets)
else:
if mode == PipelineSelection.ALL:
scope = Scope.ALL
@@ -193,49 +223,27 @@ class Pipeline():
elif mode == PipelineSelection.RUN:
scope = Scope.RUN
- elements = list(self.dependencies(scope))
-
- return self.remove_elements(elements)
-
- # dependencies()
- #
- # Generator function to iterate over elements and optionally
- # also iterate over sources.
- #
- # Args:
- # scope (Scope): The scope to iterate over
- # recurse (bool): Whether to recurse into dependencies
- # include_sources (bool): Whether to include element sources in iteration
- #
- def dependencies(self, scope, *, recurse=True, include_sources=False):
- # Keep track of 'visited' in this scope, so that all targets
- # share the same context.
- visited = {}
-
- for target in self.targets:
- for element in target.dependencies(scope, recurse=recurse, visited=visited):
- if include_sources:
- for source in element.sources():
- yield source
- yield element
+ elements = list(self.dependencies(targets, scope))
- #############################################################
- # Commands #
- #############################################################
+ return elements
- # remove_elements():
- #
- # Internal function
+ # except_elements():
#
# Return what we are left with after the intersection between
# excepted and target elements and their unique dependencies is
# gone.
#
# Args:
- # elements (list of elements): The list to remove elements from.
- def remove_elements(self, elements):
- targeted = list(self.dependencies(Scope.ALL))
-
+ # targets (list of Element): List of toplevel targetted elements
+ # elements (list of Element): The list to remove elements from
+ # except_targets (list of Element): List of toplevel except targets
+ #
+ # Returns:
+ # (list of Element): The elements list with the intersected
+ # exceptions removed
+ #
+ def except_elements(self, targets, elements, except_targets):
+ targeted = list(self.dependencies(targets, Scope.ALL))
visited = []
def find_intersection(element):
@@ -255,7 +263,7 @@ class Pipeline():
# elements that lie on the border closest to excepted elements
# between excepted and target elements.
intersection = list(itertools.chain.from_iterable(
- find_intersection(element) for element in self._exceptions
+ find_intersection(element) for element in except_targets
))
# Now use this set of elements to traverse the targeted
@@ -264,7 +272,7 @@ class Pipeline():
queue = []
visited = []
- queue.extend(self.targets)
+ queue.extend(targets)
while queue:
element = queue.pop()
if element in visited or element in intersection:
@@ -282,89 +290,77 @@ class Pipeline():
# in before.
return [element for element in elements if element in visited]
- #############################################################
- # Private Methods #
- #############################################################
-
- # _get_elements_to_track():
+ # targets_include()
+ #
+ # Checks whether the given targets are, or depend on some elements
#
- # Work out which elements are going to be tracked.
+ # Args:
+ # targets (list of Element): A list of targets
+ # elements (list of Element): List of elements to check
+ #
+ # Returns:
+ # (bool): True if all of `elements` are the `targets`, or are
+ # somehow depended on by `targets`.
#
- # Currently the 'mode' parameter only accepts
- # PipelineSelection.NONE or PipelineSelection.ALL
+ def targets_include(self, targets, elements):
+ target_element_set = set(self.dependencies(targets, Scope.ALL))
+ element_set = set(elements)
+ return element_set.issubset(target_element_set)
+
+ # subtract_elements()
#
- # This makes the assumption that the except elements are
- # meant to be removed from tracking element lists.
+ # Subtract a subset of elements
#
# Args:
- # track_targets (list of str): List of target names
- # mode (PipelineSelection): The PipelineSelection mode
+ # elements (list of Element): The element list
+ # subtract (list of Element): List of elements to subtract from elements
#
# Returns:
- # (list): List of Element objects to track
+ # (list): The original elements list, with elements in subtract removed
#
- def _get_elements_to_track(self, track_targets, mode=PipelineSelection.ALL):
- planner = _Planner()
-
- # Convert target names to elements
- track_elements = [e for e in self.dependencies(Scope.ALL)
- if e.name in track_targets]
-
- if mode != PipelineSelection.NONE:
- assert mode == PipelineSelection.ALL
-
- # Plan them out
- track_elements = planner.plan(track_elements, ignore_cache=True)
-
- # Filter out --except elements
- track_elements = self.remove_elements(track_elements)
-
- # Filter out cross junctioned elements
- if self._track_cross_junctions:
- self._assert_junction_tracking(track_elements)
- else:
- track_elements = self._filter_cross_junctions(track_elements)
-
- return track_elements
+ def subtract_elements(self, elements, subtract):
+ subtract_set = set(subtract)
+ return [
+ e for e in elements
+ if e not in subtract_set
+ ]
- # _prefilght()
+ # track_cross_junction_filter()
#
- # Preflights all the plugins in the pipeline
+ # Filters out elements which are across junction boundaries,
+ # otherwise asserts that there are no such elements.
#
- def _preflight(self):
- for element in self.dependencies(Scope.ALL):
- element._preflight()
-
- # _resolve_cache_keys()
+ # This is currently assumed to be only relevant for element
+ # lists targetted at tracking.
#
- # Initially resolve the cache keys
+ # Args:
+ # elements (list of Element): The list of elements to filter
+ # cross_junction_requested (bool): Whether the user requested
+ # cross junction tracking
#
- def _resolve_cache_keys(self):
- track_elements = set(self._track_elements)
-
- with self.context.timed_activity("Resolving cached state", silent_nested=True):
- for element in self.dependencies(Scope.ALL):
- if element in track_elements:
- # Load the pipeline in an explicitly inconsistent state, use
- # this for pipelines with tracking queues enabled.
- element._schedule_tracking()
+ # Returns:
+ # (list of Element): The filtered or asserted result
+ #
+ def track_cross_junction_filter(self, elements, cross_junction_requested):
+ # Filter out cross junctioned elements
+ if cross_junction_requested:
+ self._assert_junction_tracking(elements)
+ else:
+ elements = self._filter_cross_junctions(elements)
- # Determine initial element state. This may resolve cache keys
- # and interrogate the artifact cache.
- element._update_state()
+ return elements
- # _assert_consistent()
+ # assert_consistent()
#
- # Asserts that the pipeline is in a consistent state, that
- # is to say that all sources are consistent and can at least
- # be fetched.
+ # Asserts that the given list of elements are in a consistent state, that
+ # is to say that all sources are consistent and can at least be fetched.
#
# Consequently it also means that cache keys can be resolved.
#
- def _assert_consistent(self, toplevel):
+ def assert_consistent(self, elements):
inconsistent = []
- with self.context.timed_activity("Checking sources"):
- for element in toplevel:
+ with self._context.timed_activity("Checking sources"):
+ for element in elements:
if element._get_consistency() == Consistency.INCONSISTENT:
inconsistent.append(element)
@@ -375,6 +371,21 @@ class Pipeline():
detail += " " + element._get_full_name() + "\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
+ # cleanup()
+ #
+ # Cleans up resources used by the Pipeline.
+ #
+ def cleanup(self):
+ if self._loader:
+ self._loader.cleanup()
+
+ # Reset the element loader state
+ Element._reset_load_state()
+
+ #############################################################
+ # Private Methods #
+ #############################################################
+
# _filter_cross_junction()
#
# Filters out cross junction elements from the elements
@@ -389,7 +400,7 @@ class Pipeline():
def _filter_cross_junctions(self, elements):
return [
element for element in elements
- if element._get_project() is self.project
+ if element._get_project() is self._project
]
# _assert_junction_tracking()
@@ -404,7 +415,7 @@ class Pipeline():
# We can track anything if the toplevel project uses project.refs
#
- if self.project.ref_storage == ProjectRefStorage.PROJECT_REFS:
+ if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS:
return
# Ideally, we would want to report every cross junction element but not
@@ -414,37 +425,19 @@ class Pipeline():
# But this is too hard, lets shoot for a simple error.
for element in elements:
element_project = element._get_project()
- if element_project is not self.project:
+ if element_project is not self._project:
detail = "Requested to track sources across junction boundaries\n" + \
"in a project which does not use project.refs ref-storage."
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
- # _plan()
- #
- # Args:
- # except_ (bool): Whether to filter out the except elements from the plan
- #
- # Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
- def _plan(self, except_=True):
- build_plan = _Planner().plan(self.targets)
-
- if except_:
- build_plan = self.remove_elements(build_plan)
-
- for element in build_plan:
- yield element
-
# _message()
#
# Local message propagator
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self.context.message(
+ self._context.message(
Message(None, message_type, message, **args))
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 62d9f9804..c8d0bb69c 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -17,19 +17,22 @@
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
import os
import stat
import shlex
import shutil
import tarfile
+from contextlib import contextmanager
from tempfile import TemporaryDirectory
-from ._exceptions import StreamError, ImplError, BstError
+from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
from ._message import Message, MessageType
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._platform import Platform
-from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -41,25 +44,46 @@ from . import Scope, Consistency
# Args:
# context (Context): The Context object
# project (Project): The Project object
-# loaded_callback (callable): A callback to invoke when the pipeline is loaded
+# session_start (datetime): The time when the session started
+# session_start_callback (callable): A callback to invoke when the session starts
+# interrupt_callback (callable): A callback to invoke when we get interrupted
+# ticker_callback (callable): Invoked every second while running the scheduler
+# job_start_callback (callable): Called when a job starts
+# job_complete_callback (callable): Called when a job completes
#
class Stream():
- def __init__(self, context, project, loaded_callback):
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
-
- self._context = context
- self._project = project
- self._scheduler = None
- self._pipeline = None
+ def __init__(self, context, project, session_start, *,
+ session_start_callback=None,
+ interrupt_callback=None,
+ ticker_callback=None,
+ job_start_callback=None,
+ job_complete_callback=None):
- self._loaded_cb = loaded_callback
+ #
+ # Public members
+ #
+ self.targets = [] # Resolved target elements
+ self.session_elements = [] # List of elements being processed this session
+ self.total_elements = [] # Total list of elements based on targets
+ self.queues = [] # Queue objects
- # Load selected platform
+ #
+ # Private members
+ #
Platform.create_instance(context, project)
self._platform = Platform.get_platform()
self._artifacts = self._platform.artifactcache
+ self._context = context
+ self._project = project
+ self._pipeline = Pipeline(context, project, self._artifacts)
+ self._scheduler = Scheduler(context, session_start,
+ interrupt_callback=interrupt_callback,
+ ticker_callback=ticker_callback,
+ job_start_callback=job_start_callback,
+ job_complete_callback=job_complete_callback)
+ self._first_non_track_queue = None
+ self._session_start_callback = session_start_callback
# cleanup()
#
@@ -81,14 +105,18 @@ class Stream():
# except_targets (list of str): Specified targets to except from fetching
# downloadable (bool): Whether the downloadable state of elements should be resolved
#
+ # Returns:
+ # (list of Element): The selected elements
def load_selection(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=(),
downloadable=False):
- self.init_pipeline(targets, except_=except_targets,
- use_configured_remote_caches=downloadable,
- fetch_subprojects=False)
- return self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ except_targets=except_targets,
+ use_artifact_config=downloadable,
+ fetch_subprojects=False)
+ return elements
# shell()
#
@@ -118,7 +146,7 @@ class Stream():
if directory is None:
missing_deps = [
dep._get_full_name()
- for dep in self._pipeline.dependencies(scope)
+ for dep in self._pipeline.dependencies([element], scope)
if not dep._cached()
]
if missing_deps:
@@ -145,66 +173,47 @@ class Stream():
track_cross_junctions=False,
build_all=False):
- rewritable = False
- if track_targets:
- rewritable = True
+ if build_all or track_targets:
+ selection = PipelineSelection.ALL
+ else:
+ selection = PipelineSelection.PLAN
- self.init_pipeline(targets,
- except_=track_except,
- rewritable=rewritable,
- use_configured_remote_caches=True,
- track_elements=track_targets,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=PipelineSelection.ALL,
+ track_except_targets=track_except,
+ track_cross_junctions=track_cross_junctions,
+ use_artifact_config=True,
+ fetch_subprojects=True)
- if build_all:
- plan = self._pipeline.dependencies(Scope.ALL)
- else:
- plan = self._pipeline._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._pipeline._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._pipeline._assert_consistent(plan)
-
- fetch = FetchQueue(self._scheduler, skip_cached=True)
- build = BuildQueue(self._scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- queues.append(track)
- if self._pipeline._artifacts.has_fetch_remotes():
- pull = PullQueue(self._scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._pipeline._artifacts.has_push_remotes():
- push = PushQueue(self._scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._pipeline._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
+ # Remove the tracking elements from the main targets
+ elements = self._pipeline.subtract_elements(elements, track_elements)
- self.session_elements = len(self._pipeline._track_elements) + len(plan)
+ # Assert that the elements we're not going to track are consistent
+ self._pipeline.assert_consistent(elements)
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Now construct the queues
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+
+ if self._artifacts.has_fetch_remotes():
+ self._add_queue(PullQueue(self._scheduler))
+
+ self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
+ self._add_queue(BuildQueue(self._scheduler))
+
+ if self._artifacts.has_push_remotes():
+ self._add_queue(PushQueue(self._scheduler))
+
+ # Enqueue elements
+ #
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(elements)
+ self._run()
# fetch()
#
@@ -223,21 +232,25 @@ class Stream():
track_targets=False,
track_cross_junctions=False):
- rewritable = False
if track_targets:
- rewritable = True
-
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=rewritable,
- track_elements=targets if track_targets else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ track_targets = targets
+ track_selection = selection
+ track_except_targets = except_targets
+ else:
+ track_targets = ()
+ track_selection = PipelineSelection.NONE
+ track_except_targets = ()
- fetch_plan = self._pipeline.get_selection(selection)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=track_selection,
+ except_targets=except_targets,
+ track_except_targets=track_except_targets,
+ track_cross_junctions=track_cross_junctions,
+ fetch_subprojects=True)
- # Delegated to a shared method for now
- self._do_fetch(fetch_plan)
+ # Delegated to a shared fetch method
+ self._fetch(elements, track_elements=track_elements)
# track()
#
@@ -255,26 +268,20 @@ class Stream():
def track(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=None,
- track_targets=False,
cross_junctions=False):
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=True,
- track_elements=targets,
- track_cross_junctions=cross_junctions,
- track_selection=selection,
- fetch_subprojects=True)
+ _, elements = \
+ self._load(targets, targets,
+ selection=selection, track_selection=selection,
+ except_targets=except_targets,
+ track_except_targets=except_targets,
+ track_cross_junctions=cross_junctions,
+ fetch_subprojects=True)
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._enqueue_plan(elements, queue=track_queue)
+ self._run()
# pull()
#
@@ -292,33 +299,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
+ use_config = True
+ if remote:
+ use_config = False
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- elements = self._pipeline.get_selection(selection)
-
- if not self._pipeline._artifacts.has_fetch_remotes():
+ if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- pull = PullQueue(self._scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# push()
#
@@ -336,33 +333,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
-
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ use_config = True
+ if remote:
+ use_config = False
- elements = self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- if not self._pipeline._artifacts.has_push_remotes():
+ if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- push = PushQueue(self._scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PushQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# checkout()
#
@@ -382,10 +369,9 @@ class Stream():
integrate=True,
hardlinks=False):
- self.init_pipeline((target,), fetch_subprojects=True)
-
# We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ elements, _ = self._load((target,), (), fetch_subprojects=True)
+ target = elements[0]
try:
os.makedirs(directory, exist_ok=True)
@@ -433,13 +419,13 @@ class Stream():
track_first,
force):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
- target = self._pipeline.targets[0]
+ elements, track_elements = self._load((target,), track_targets)
+ target = elements[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -459,11 +445,11 @@ class Stream():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
- # For now, tracking is handled by _do_fetch() automatically
- # by virtue of our screwed up pipeline initialization stuff.
- #
if not no_checkout or track_first:
- self._do_fetch([target])
+ track_elements = []
+ if track_first:
+ track_elements = elements
+ self._fetch(elements, track_elements=track_elements)
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -522,34 +508,35 @@ class Stream():
#
def workspace_reset(self, targets, *, track_first):
- self.init_pipeline(targets,
- track_elements=targets if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = targets
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load(targets, track_targets)
# Do the tracking first
if track_first:
- self._do_fetch(self._pipeline.targets)
+ self._fetch(elements, track_elements=track_elements)
- for target in self._pipeline.targets:
- workspace = self._project.workspaces.get_workspace(target.name)
+ for element in elements:
+ workspace = self._project.workspaces.get_workspace(element.name)
- with target.timed_activity("Removing workspace directory {}"
- .format(workspace.path)):
+ with element.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
try:
shutil.rmtree(workspace.path)
except OSError as e:
raise StreamError("Could not remove '{}': {}"
.format(workspace.path, e)) from e
- self._project.workspaces.delete_workspace(target.name)
- self._project.workspaces.create_workspace(target.name, workspace.path)
+ self._project.workspaces.delete_workspace(element.name)
+ self._project.workspaces.create_workspace(element.name, workspace.path)
- with target.timed_activity("Staging sources to {}".format(workspace.path)):
- target._open_workspace()
+ with element.timed_activity("Staging sources to {}".format(workspace.path)):
+ element._open_workspace()
- self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
self._project.workspaces.save_config()
@@ -609,15 +596,20 @@ class Stream():
force=False,
compression="gz"):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=True)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load((target,), track_targets,
+ selection=PipelineSelection.ALL,
+ track_selection=PipelineSelection.ALL,
+ fetch_subprojects=True)
# source-bundle only supports one target
- target = self._pipeline.targets[0]
- dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
+ target = self.targets[0]
+
+ self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -635,14 +627,15 @@ class Stream():
raise StreamError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
- plan = list(dependencies)
- self._do_fetch(plan)
+ # Fetch and possibly track first
+ #
+ self._fetch(elements, track_elements=track_elements)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
+ builddir = self._context.builddir
prefix = "{}-".format(target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
@@ -655,18 +648,162 @@ class Stream():
# Any elements that don't implement _write_script
# should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
+ elements = [
+ element for element in elements
+ if self._write_element_script(source_directory, element)
+ ]
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
+ self._write_element_sources(tempdir, elements)
+ self._write_build_script(tempdir, elements)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
#############################################################
- # Private Methods #
+ # Scheduler API forwarding #
+ #############################################################
+
+ # running
+ #
+ # Whether the scheduler is running
+ #
+ @property
+ def running(self):
+ return self._scheduler.loop is not None
+
+ # suspended
+ #
+ # Whether the scheduler is currently suspended
+ #
+ @property
+ def suspended(self):
+ return self._scheduler.suspended
+
+ # terminated
+ #
+ # Whether the scheduler is currently terminated
+ #
+ @property
+ def terminated(self):
+ return self._scheduler.terminated
+
+ # elapsed_time
+ #
+ # Elapsed time since the session start
+ #
+ @property
+ def elapsed_time(self):
+ return self._scheduler.elapsed_time()
+
+ # terminate()
+ #
+ # Terminate jobs
+ #
+ def terminate(self):
+ self._scheduler.terminate_jobs()
+
+ # quit()
+ #
+ # Quit the session, this will continue with any ongoing
+ # jobs, use Stream.terminate() instead for cancellation
+ # of ongoing jobs
+ #
+ def quit(self):
+ self._scheduler.stop_queueing()
+
+ # suspend()
+ #
+ # Context manager to suspend ongoing jobs
+ #
+ @contextmanager
+ def suspend(self):
+ with self._scheduler.jobs_suspended():
+ yield
+
+ #############################################################
+ # Private Methods #
#############################################################
+ # _load()
+ #
+ # A convenience method for loading element lists
+ #
+ # Args:
+ # targets (list of str): Main targets to load
+ # track_targets (list of str): Tracking targets
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # track_selection (PipelineSelection): The selection mode for the specified tracking targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_except_targets (list of str): Specified targets to except from fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
+ # use_artifact_config (bool): Whether to initialize artifacts with the config
+ # artifact_remote_url (bool): A remote url for initializing the artifacts
+ # fetch_subprojects (bool): Whether to fetch subprojects while loading
+ #
+ # Returns:
+ # (list of Element): The primary element selection
+ # (list of Element): The tracking element selection
+ #
+ def _load(self, targets, track_targets, *,
+ selection=PipelineSelection.NONE,
+ track_selection=PipelineSelection.NONE,
+ except_targets=(),
+ track_except_targets=(),
+ track_cross_junctions=False,
+ use_artifact_config=False,
+ artifact_remote_url=None,
+ fetch_subprojects=False):
+
+ # Load rewritable if we have any tracking selection to make
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ # Load all targets
+ elements, except_elements, track_elements, track_except_elements = \
+ self._pipeline.load([targets, except_targets, track_targets, track_except_targets],
+ rewritable=rewritable,
+ fetch_subprojects=fetch_subprojects)
+
+ # Hold on to the targets
+ self.targets = elements
+
+ # Here we should raise an error if the track_elements targets
+ # are not dependencies of the primary targets, this is not
+ # supported.
+ #
+ # This can happen with `bst build --track`
+ #
+ if not self._pipeline.targets_include(elements, track_elements):
+ raise StreamError("Specified tracking targets that are not "
+ "within the scope of primary targets")
+
+ # First take care of marking tracking elements, this must be
+ # done before resolving element states.
+ #
+ assert track_selection != PipelineSelection.PLAN
+ track_selected = self._pipeline.get_selection(track_elements, track_selection)
+ track_selected = self._pipeline.except_elements(track_elements,
+ track_selected,
+ track_except_elements)
+ track_selected = self._pipeline.track_cross_junction_filter(track_selected,
+ track_cross_junctions)
+
+ for element in track_selected:
+ element._schedule_tracking()
+
+ # Connect to remote caches, this needs to be done before resolving element state
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
+
+ # Now move on to loading primary selection.
+ #
+ self._pipeline.resolve_elements(elements)
+ selected = self._pipeline.get_selection(elements, selection)
+ selected = self._pipeline.except_elements(elements,
+ selected,
+ except_elements)
+
+ return selected, track_selected
+
# _message()
#
# Local message propagator
@@ -676,47 +813,103 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
- # _do_fetch()
+ # _add_queue()
+ #
+ # Adds a queue to the stream
+ #
+ # Args:
+ # queue (Queue): Queue to add to the pipeline
+ # track (bool): Whether this is the tracking queue
+ #
+ def _add_queue(self, queue, *, track=False):
+ self.queues.append(queue)
+
+ if not (track or self._first_non_track_queue):
+ self._first_non_track_queue = queue
+
+ # _enqueue_plan()
+ #
+ # Enqueues planned elements to the specified queue.
+ #
+ # Args:
+ # plan (list of Element): The list of elements to be enqueued
+ # queue (Queue): The target queue, defaults to the first non-track queue
+ #
+ def _enqueue_plan(self, plan, *, queue=None):
+ queue = queue or self._first_non_track_queue
+
+ queue.enqueue(plan)
+ self.session_elements += plan
+
+ # _run()
+ #
+ # Common function for running the scheduler
+ #
+ def _run(self):
+
+ # Inform the frontend of the full list of elements
+ # and the list of elements which will be processed in this run
+ #
+ self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
+
+ if self._session_start_callback is not None:
+ self._session_start_callback()
+
+ _, status = self._scheduler.run(self.queues)
+
+ # Force update element states after a run, such that the summary
+ # is more coherent
+ try:
+ for element in self.total_elements:
+ element._update_state()
+ except BstError as e:
+ self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ set_last_task_error(e.domain, e.reason)
+ except Exception as e: # pylint: disable=broad-except
+ self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
+ # _fetch()
#
# Performs the fetch job, the body of this function is here because
# it is shared between a few internals.
#
# Args:
# elements (list of Element): Elements to fetch
+ # track_elements (list of Element): Elements to track
#
- def _do_fetch(self, elements):
+ def _fetch(self, elements, *, track_elements=None):
- fetch_plan = elements
+ if track_elements is None:
+ track_elements = []
# Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
+ fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
# Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
+ self._pipeline.assert_consistent(fetch_plan)
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+ fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
+ # Construct queues, enqueue and run
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._add_queue(FetchQueue(self._scheduler))
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(fetch_plan)
+ self._run()
# Helper function for checkout()
#
@@ -772,7 +965,7 @@ class Stream():
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
- with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ with self._context.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
@@ -780,64 +973,3 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
-
- #############################################################
- # TEMPORARY CRAP #
- #############################################################
-
- # init_pipeline()
- #
- # Initialize the pipeline for a given activity
- #
- # Args:
- # elements (list of elements): The elements to load recursively
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
- # fetch_subprojects (bool): Whether to fetch subprojects while loading
- #
- # Note that the except_ argument may have a subtly different meaning depending
- # on the activity performed on the Pipeline. In normal circumstances the except_
- # argument excludes elements from the `elements` list. In a build session, the
- # except_ elements are excluded from the tracking plan.
- #
- def init_pipeline(self, elements, *,
- except_=tuple(),
- rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
- fetch_subprojects=True):
-
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- self._pipeline = Pipeline(self._context, self._project, self._artifacts,
- elements, except_,
- rewritable=rewritable,
- fetch_subprojects=fetch_subprojects)
-
- # After loading the projects, but before resolving cache keys,
- # we need to initialize remote artifact caches where relevant
- #
- self._artifacts.setup_remotes(use_config=use_configured_remote_caches,
- remote_url=add_remote_cache)
-
- # Now complete the initialization
- #
- self._pipeline.initialize(track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- # Get the total
- self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
-
- if self._loaded_cb is not None:
- self._loaded_cb(self._pipeline)
diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py
index 84d543e52..3f0a3adbe 100644
--- a/tests/frontend/buildtrack.py
+++ b/tests/frontend/buildtrack.py
@@ -31,40 +31,24 @@ def create_element(repo, name, path, dependencies, ref=None):
@pytest.mark.datafiles(os.path.join(DATA_DIR))
@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
-@pytest.mark.parametrize("exceptions,excepted", [
+@pytest.mark.parametrize("track_targets,exceptions,tracked", [
# Test with no exceptions
- ([], []),
+ (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
+ (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']),
+ (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
# Test excepting '2.bst'
- (['2.bst'], ['2.bst', '7.bst']),
+ (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']),
+ (['3.bst'], ['2.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']),
# Test excepting '2.bst' and '3.bst'
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
+ (['0.bst'], ['2.bst', '3.bst'], ['0.bst']),
+ (['3.bst'], ['2.bst', '3.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst', '3.bst'], [])
])
-@pytest.mark.parametrize("track_targets,tracked", [
- # Test tracking the main target element
- (['0.bst'], [
- '0.bst', '2.bst', '3.bst',
- '4.bst', '5.bst', '6.bst', '7.bst'
- ]),
-
- # Test tracking a child element
- (['3.bst'], [
- '3.bst', '4.bst', '5.bst',
- '6.bst'
- ]),
-
- # Test tracking multiple children
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
-])
-def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
- exceptions, tracked, excepted):
+def test_build_track(cli, datafiles, tmpdir, ref_storage,
+ track_targets, exceptions, tracked):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
@@ -102,7 +86,7 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
for element, dependencies in create_elements.items():
# Test the element inconsistency resolution by ensuring that
# only elements that aren't tracked have refs
- if element in set(tracked) - set(excepted):
+ if element in set(tracked):
# Elements which should not have a ref set
#
create_element(repo, element, element_path, dependencies)
@@ -133,14 +117,14 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
result = cli.run(project=project, silent=True, args=args)
tracked_elements = result.get_tracked_elements()
- assert set(tracked_elements) == set(tracked) - set(excepted)
+ assert set(tracked_elements) == set(tracked)
# Delete element sources
source_dir = os.path.join(project, 'cache', 'sources')
shutil.rmtree(source_dir)
# Delete artifacts one by one and assert element states
- for target in set(tracked) - set(excepted):
+ for target in set(tracked):
cli.remove_artifact_from_cache(project, target)
# Assert that it's tracked
@@ -154,40 +138,24 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
@pytest.mark.datafiles(os.path.join(DATA_DIR))
-@pytest.mark.parametrize("exceptions,excepted", [
+@pytest.mark.parametrize("track_targets,exceptions,tracked", [
# Test with no exceptions
- ([], []),
+ (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
+ (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']),
+ (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
# Test excepting '2.bst'
- (['2.bst'], ['2.bst', '7.bst']),
+ (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']),
+ (['3.bst'], ['2.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']),
# Test excepting '2.bst' and '3.bst'
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
-])
-@pytest.mark.parametrize("track_targets,tracked", [
- # Test tracking the main target element
- (['0.bst'], [
- '0.bst', '2.bst', '3.bst',
- '4.bst', '5.bst', '6.bst', '7.bst'
- ]),
-
- # Test tracking a child element
- (['3.bst'], [
- '3.bst', '4.bst', '5.bst',
- '6.bst'
- ]),
-
- # Test tracking multiple children
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
+ (['0.bst'], ['2.bst', '3.bst'], ['0.bst']),
+ (['3.bst'], ['2.bst', '3.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst', '3.bst'], [])
])
def test_build_track_update(cli, datafiles, tmpdir, track_targets,
- exceptions, tracked, excepted):
+ exceptions, tracked):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
@@ -231,7 +199,7 @@ def test_build_track_update(cli, datafiles, tmpdir, track_targets,
result = cli.run(project=project, silent=True, args=args)
tracked_elements = result.get_tracked_elements()
- assert set(tracked_elements) == set(tracked) - set(excepted)
+ assert set(tracked_elements) == set(tracked)
@pytest.mark.datafiles(os.path.join(DATA_DIR))
diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py
index db683094b..65929cf50 100644
--- a/tests/plugins/pipeline.py
+++ b/tests/plugins/pipeline.py
@@ -23,23 +23,25 @@ def create_pipeline(tmpdir, basedir, target):
context.set_message_handler(dummy_handler)
- return Pipeline(context, project, None, [target], [])
+ pipeline = Pipeline(context, project, None)
+ targets, = pipeline.load([(target,)])
+ return targets
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource'))
def test_customsource(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
- assert(pipeline.targets[0].get_kind() == "autotools")
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
+ assert(targets[0].get_kind() == "autotools")
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customelement'))
def test_customelement(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
- assert(pipeline.targets[0].get_kind() == "foo")
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
+ assert(targets[0].get_kind() == "foo")
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'badversionsource'))
@@ -47,7 +49,7 @@ def test_badversionsource(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
with pytest.raises(LoadError) as exc:
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN
@@ -57,6 +59,6 @@ def test_badversionelement(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
with pytest.raises(LoadError) as exc:
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN