summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-03 15:14:38 +0900
committerTristan Van Berkom <tristan.vanberkom@codethink.co.uk>2018-05-08 03:59:38 +0900
commit5bdc0a79d1fb67d2da552a902163dec450ff292c (patch)
treeee63ed73e90dbc9aba1d08971e54016bb7839708
parentb8e15706a51272e4f4e116d9e373fd2581102868 (diff)
downloadbuildstream-5bdc0a79d1fb67d2da552a902163dec450ff292c.tar.gz
_stream.py, _pipeline.py: Refactoring of the pipeline itself
Here the pipeline becomes essentially stateless, some dangling state remains to be factored out because of frontend accesses which will be changed in a later commit. Essentially, the Pipeline.load() method no longer has any knowledge of the specific purposes of the loaded targets, and now takes a list of target groups and returns a corresponding list of element groups. The Stream() business logic methods now use other pipeline helper methods to create and filter lists from the loaded target elements. The Stream() also finally absorbs the Scheduler frontend facing APIs. However Queues are still exposed on the Stream object for logging purposes and through callbacks such that the frontend can retry elements.
-rw-r--r--buildstream/_frontend/app.py70
-rw-r--r--buildstream/_frontend/status.py25
-rw-r--r--buildstream/_frontend/widget.py37
-rw-r--r--buildstream/_pipeline.py369
-rw-r--r--buildstream/_stream.py688
-rw-r--r--tests/frontend/buildtrack.py86
-rw-r--r--tests/plugins/pipeline.py16
7 files changed, 685 insertions, 606 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 58a8eab89..de23c12b0 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -39,7 +39,6 @@ from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
-from .._scheduler import Scheduler
from .._versions import BST_FORMAT_VERSION
from .. import __version__ as build_stream_version
from .. import _yaml
@@ -69,7 +68,6 @@ class App():
self.context = None # The Context object
self.stream = None # The Stream object
self.project = None # The toplevel Project object
- self.scheduler = None # The Scheduler
self.logger = None # The LogLine object
self.interactive = None # Whether we are running in interactive mode
self.colors = None # Whether to use colors in logging
@@ -83,6 +81,7 @@ class App():
self._status = None # The Status object
self._fail_messages = {} # Failure messages by unique plugin id
self._interactive_failures = None # Whether to handle failures interactively
+ self._started = False # Whether a session has started
# UI Colors Profiles
self._content_profile = Profile(fg='yellow')
@@ -202,14 +201,12 @@ class App():
self._error_exit(e, "Error loading project")
# Create the stream right away, we'll need to pass it around
- self.stream = Stream(self.context, self.project, self.loaded_cb)
-
- # Create the application's scheduler
- self.scheduler = Scheduler(self.context, self._session_start,
- interrupt_callback=self._interrupt_handler,
- ticker_callback=self._tick,
- job_start_callback=self._job_started,
- job_complete_callback=self._job_completed)
+ self.stream = Stream(self.context, self.project, self._session_start,
+ session_start_callback=self.session_start_cb,
+ interrupt_callback=self._interrupt_handler,
+ ticker_callback=self._tick,
+ job_start_callback=self._job_started,
+ job_complete_callback=self._job_completed)
# Create the logger right before setting the message handler
self.logger = LogLine(self.context,
@@ -224,8 +221,7 @@ class App():
self._status = Status(self.context,
self._content_profile, self._format_profile,
self._success_profile, self._error_profile,
- self.stream, self.scheduler,
- colors=self.colors)
+ self.stream, colors=self.colors)
# Propagate pipeline feedback to the user
self.context.set_message_handler(self._message_handler)
@@ -238,10 +234,6 @@ class App():
if session_name:
self._message(MessageType.START, session_name)
- # XXX This is going to change soon !
- #
- self.stream._scheduler = self.scheduler
-
# Run the body of the session here, once everything is loaded
try:
yield
@@ -249,14 +241,15 @@ class App():
# Print a nice summary if this is a session
if session_name:
- elapsed = self.scheduler.elapsed_time()
+ elapsed = self.stream.elapsed_time
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
else:
self._message(MessageType.FAIL, session_name, elapsed=elapsed)
- self._print_summary()
+ if self._started:
+ self._print_summary()
# Exit with the error
self._error_exit(e)
@@ -264,8 +257,9 @@ class App():
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.scheduler.elapsed_time())
- self._print_summary()
+ self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ if self._started:
+ self._print_summary()
# init_project()
#
@@ -400,8 +394,8 @@ class App():
# If the scheduler has started, try to terminate all jobs gracefully,
# otherwise exit immediately.
- if self.scheduler.loop:
- self.scheduler.terminate_jobs()
+ if self.stream.running:
+ self.stream.terminate()
else:
sys.exit(-1)
@@ -411,8 +405,8 @@ class App():
def _maybe_render_status(self):
# If we're suspended or terminating, then dont render the status area
- if self._status and self.scheduler and \
- not (self.scheduler.suspended or self.scheduler.terminated):
+ if self._status and self.stream and \
+ not (self.stream.suspended or self.stream.terminated):
self._status.render()
#
@@ -423,7 +417,7 @@ class App():
# Only handle ^C interactively in interactive mode
if not self.interactive:
self._status.clear()
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
return
# Here we can give the user some choices, like whether they would
@@ -452,11 +446,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs at user request\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing\n", err=True)
@@ -473,7 +467,7 @@ class App():
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if not success and not self.scheduler.terminated:
+ if not success and not self.stream.terminated:
# Get the last failure message for additional context
failure = self._fail_messages.get(element._get_unique_id())
@@ -494,9 +488,9 @@ class App():
if not self._interactive_failures:
if self.context.sched_error_action == 'terminate':
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
elif self.context.sched_error_action == 'quit':
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif self.context.sched_error_action == 'continue':
pass
return
@@ -551,11 +545,11 @@ class App():
if choice == 'terminate':
click.echo("\nTerminating all jobs\n", err=True)
- self.scheduler.terminate_jobs()
+ self.stream.terminate()
else:
if choice == 'quit':
click.echo("\nCompleting ongoing tasks before quitting\n", err=True)
- self.scheduler.stop_queueing()
+ self.stream.quit()
elif choice == 'continue':
click.echo("\nContinuing with other non failing elements\n", err=True)
elif choice == 'retry':
@@ -567,10 +561,12 @@ class App():
# Print the session heading if we've loaded a pipeline and there
# is going to be a session
#
- def loaded_cb(self, pipeline):
+ def session_start_cb(self):
+ self._started = True
if self._session_name:
- self.logger.print_heading(pipeline,
- self._main_options['log_file'],
+ self.logger.print_heading(self.project,
+ self.stream,
+ log_file=self._main_options['log_file'],
styling=self.colors)
#
@@ -578,7 +574,7 @@ class App():
#
def _print_summary(self):
click.echo("", err=True)
- self.logger.print_summary(self.stream, self.scheduler,
+ self.logger.print_summary(self.stream,
self._main_options['log_file'],
styling=self.colors)
@@ -642,7 +638,7 @@ class App():
def _interrupted(self):
self._status.clear()
try:
- with self.scheduler.jobs_suspended():
+ with self.stream.suspend():
yield
finally:
self._maybe_render_status()
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 13e00f58f..0e5855181 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -39,7 +39,6 @@ from .widget import TimeCode
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
-# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
#
class Status():
@@ -47,14 +46,14 @@ class Status():
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler, colors=False):
+ stream, colors=False):
self._context = context
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
- self._scheduler = scheduler
+ self._stream = stream
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
self._term = Terminal()
@@ -63,7 +62,7 @@ class Status():
self._header = _StatusHeader(context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler)
+ stream)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
@@ -80,7 +79,7 @@ class Status():
# action_name (str): The action name for this job
#
def add_job(self, element, action_name):
- elapsed = self._scheduler.elapsed_time()
+ elapsed = self._stream.elapsed_time
job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
@@ -136,7 +135,7 @@ class Status():
if not self._term.does_styling:
return
- elapsed = self._scheduler.elapsed_time()
+ elapsed = self._stream.elapsed_time
self.clear()
self._check_term_width()
@@ -251,14 +250,13 @@ class Status():
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# stream (Stream): The Stream
-# scheduler (Scheduler): The Scheduler
#
class _StatusHeader():
def __init__(self, context,
content_profile, format_profile,
success_profile, error_profile,
- stream, scheduler):
+ stream):
#
# Public members
@@ -273,7 +271,6 @@ class _StatusHeader():
self._success_profile = success_profile
self._error_profile = error_profile
self._stream = stream
- self._scheduler = scheduler
self._time_code = TimeCode(context, content_profile, format_profile)
self._context = context
@@ -283,8 +280,8 @@ class _StatusHeader():
size = 0
text = ''
- session = str(self._stream.session_elements)
- total = str(self._stream.total_elements)
+ session = str(len(self._stream.session_elements))
+ total = str(len(self._stream.total_elements))
# Format and calculate size for target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
@@ -303,10 +300,10 @@ class _StatusHeader():
text = ''
# Format and calculate size for each queue progress
- for queue in self._scheduler.queues:
+ for queue in self._stream.queues:
# Add spacing
- if self._scheduler.queues.index(queue) > 0:
+ if self._stream.queues.index(queue) > 0:
size += 2
text += self._format_profile.fmt('→ ')
@@ -366,7 +363,7 @@ class _StatusHeader():
# action_name (str): The name of the action
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
-# elapsed (datetime): The offset of the scheduler when this job is created
+# elapsed (datetime): The offset into the session when this job is created
#
class _StatusJob():
diff --git a/buildstream/_frontend/widget.py b/buildstream/_frontend/widget.py
index 814f87ff5..5b405682a 100644
--- a/buildstream/_frontend/widget.py
+++ b/buildstream/_frontend/widget.py
@@ -28,7 +28,7 @@ import click
from ruamel import yaml
from . import Profile
-from .. import Element, Scope, Consistency
+from .. import Element, Consistency
from .. import _yaml
from .. import __version__ as bst_version
from .._exceptions import ImplError
@@ -435,23 +435,17 @@ class LogLine(Widget):
# and so on.
#
# Args:
- # pipeline (Pipeline): The pipeline to print the heading of
+ # project (Project): The toplevel project we were invoked from
+ # stream (Stream): The stream
# log_file (file): An optional file handle for additional logging
- # deps (list): Optional list of elements, default is to use the whole pipeline
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_heading(self, pipeline, log_file, deps=None, styling=False):
+ def print_heading(self, project, stream, *, log_file, styling=False):
context = self.context
- project = pipeline.project
starttime = datetime.datetime.now()
text = ''
- assert self._resolved_keys is None
- elements = set()
- visited = {}
- for element in pipeline.targets:
- elements.update(element.dependencies(Scope.ALL, visited=visited))
- self._resolved_keys = {element: element._get_cache_key() for element in elements}
+ self._resolved_keys = {element: element._get_cache_key() for element in stream.session_elements}
# Main invocation context
text += '\n'
@@ -459,7 +453,7 @@ class LogLine(Widget):
values = OrderedDict()
values["Session Start"] = starttime.strftime('%A, %d-%m-%Y at %H:%M:%S')
values["Project"] = "{} ({})".format(project.name, project.directory)
- values["Targets"] = ", ".join([t.name for t in pipeline.targets])
+ values["Targets"] = ", ".join([t.name for t in stream.targets])
text += self._format_values(values)
# User configurations
@@ -494,9 +488,7 @@ class LogLine(Widget):
# Pipeline state
text += self.content_profile.fmt("Pipeline\n", bold=True)
- if deps is None:
- deps = pipeline.dependencies(Scope.ALL)
- text += self.show_pipeline(deps, context.log_element_format)
+ text += self.show_pipeline(stream.total_elements, context.log_element_format)
text += '\n'
# Separator line before following output
@@ -512,16 +504,15 @@ class LogLine(Widget):
#
# Args:
# stream (Stream): The Stream
- # scheduler (Scheduler): The Scheduler
# log_file (file): An optional file handle for additional logging
# styling (bool): Whether to enable ansi escape codes in the output
#
- def print_summary(self, stream, scheduler, log_file, styling=False):
+ def print_summary(self, stream, log_file, styling=False):
# Early silent return if there are no queues, can happen
- # only in the case that the pipeline early returned due to
+ # only in the case that the stream early returned due to
# an inconsistent pipeline state.
- if scheduler.queues is None:
+ if not stream.queues:
return
text = ''
@@ -544,18 +535,18 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(stream.total_elements))
- values['Session'] = self.content_profile.fmt(str(stream.session_elements))
+ values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
+ values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
processed_maxlen = 1
skipped_maxlen = 1
failed_maxlen = 1
- for queue in scheduler.queues:
+ for queue in stream.queues:
processed_maxlen = max(len(str(len(queue.processed_elements))), processed_maxlen)
skipped_maxlen = max(len(str(len(queue.skipped_elements))), skipped_maxlen)
failed_maxlen = max(len(str(len(queue.failed_elements))), failed_maxlen)
- for queue in scheduler.queues:
+ for queue in stream.queues:
processed = str(len(queue.processed_elements))
skipped = str(len(queue.skipped_elements))
failed = str(len(queue.failed_elements))
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 5327d3fb6..8861556c9 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -18,13 +18,16 @@
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+import os
import itertools
from operator import itemgetter
from ._exceptions import PipelineError
from ._message import Message, MessageType
from ._loader import Loader
+from ._profile import Topics, profile_start, profile_end
from .element import Element
from . import Scope, Consistency
from ._project import ProjectRefStorage
@@ -60,60 +63,59 @@ class PipelineSelection():
# Pipeline()
#
# Args:
-# context (Context): The Context object
# project (Project): The Project object
-# target (str): A bst filename relative to the project directory
-# inconsistent (bool): Whether to load the pipeline in a forcefully inconsistent state,
-# this is appropriate when source tracking will run and the
-# current source refs will not be the effective refs.
-# rewritable (bool): Whether the loaded files should be rewritable
-# this is a bit more expensive due to deep copies
-# fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
-# loading process, if they are not yet locally cached
-#
-# The ticker methods will be called with an element name for each tick, a final
-# tick with None as the argument is passed to signal that processing of this
-# stage has terminated.
-#
-# Raises:
-# LoadError
-# PluginError
-# SourceError
-# ElementError
-# ProgramNotFoundError
+# context (Context): The Context object
+# artifacts (Context): The ArtifactCache object
#
class Pipeline():
- def __init__(self, context, project, artifacts, targets, except_, *,
- rewritable=False,
- fetch_subprojects=True):
+ def __init__(self, context, project, artifacts):
- self.context = context # The Context
- self.project = project # The toplevel project
- self.targets = [] # List of toplevel target Element objects
+ self._context = context # The Context
+ self._project = project # The toplevel project
#
# Private members
#
self._artifacts = artifacts
self._loader = None
- self._exceptions = None
- self._track_cross_junctions = False
- self._track_elements = []
- #
- # Early initialization
- #
+ # load()
+ #
+ # Loads elements from target names.
+ #
+ # This function is called with a list of lists, such that multiple
+ # target groups may be specified. Element names specified in `targets`
+ # are allowed to be redundant.
+ #
+ # Args:
+ # target_groups (list of lists): Groups of toplevel targets to load
+ # fetch_subprojects (bool): Whether we should fetch subprojects as a part of the
+ # loading process, if they are not yet locally cached
+ # rewritable (bool): Whether the loaded files should be rewritable
+ # this is a bit more expensive due to deep copies
+ #
+ # Returns:
+ # (tuple of lists): A tuple of grouped Element objects corresponding to target_groups
+ #
+ def load(self, target_groups, *,
+ fetch_subprojects=True,
+ rewritable=False):
+
+ # First concatenate all the lists for the loader's sake
+ targets = list(itertools.chain(*target_groups))
- self._loader = Loader(self.context, self.project, targets + except_,
+ profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
+
+ self._loader = Loader(self._context, self._project, targets,
fetch_subprojects=fetch_subprojects)
- with self.context.timed_activity("Loading pipeline", silent_nested=True):
+ with self._context.timed_activity("Loading pipeline", silent_nested=True):
meta_elements = self._loader.load(rewritable, None)
# Resolve the real elements now that we've loaded the project
- with self.context.timed_activity("Resolving pipeline"):
- resolved_elements = [
+ with self._context.timed_activity("Resolving pipeline"):
+ elements = [
Element._new_from_meta(meta, self._artifacts)
for meta in meta_elements
]
@@ -130,61 +132,89 @@ class Pipeline():
detail += "\n".join(lines)
self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
- self.targets = resolved_elements[:len(targets)]
- self._exceptions = resolved_elements[len(targets):]
+ # Now create element groups to match the input target groups
+ elt_iter = iter(elements)
+ element_groups = [
+ [next(elt_iter) for i in range(len(group))]
+ for group in target_groups
+ ]
+
+ profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in targets))
+
+ return tuple(element_groups)
- # initialize()
+ # resolve_elements()
#
- # Initialize the pipeline
+ # Resolve element state and cache keys.
#
# Args:
- # track_element (list of Elements): List of elements specified by the frontend for tracking
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
+ # targets (list of Element): The list of toplevel element targets
#
- def initialize(self,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL):
+ def resolve_elements(self, targets):
+ with self._context.timed_activity("Resolving cached state", silent_nested=True):
+ for element in self.dependencies(targets, Scope.ALL):
- # Preflight directly, before ever interrogating caches or anything.
- self._preflight()
+ # Preflight
+ element._preflight()
- # Work out what we're going track, if anything
- self._track_cross_junctions = track_cross_junctions
- if track_elements:
- self._track_elements = self._get_elements_to_track(track_elements, track_selection)
-
- # Now resolve the cache keys once tracking elements have been resolved
- self._resolve_cache_keys()
+ # Determine initial element state.
+ element._update_state()
- # cleanup()
+ # dependencies()
#
- # Cleans up resources used by the Pipeline.
+ # Generator function to iterate over elements and optionally
+ # also iterate over sources.
#
- def cleanup(self):
- if self._loader:
- self._loader.cleanup()
+ # Args:
+ # targets (list of Element): The target Elements to loop over
+ # scope (Scope): The scope to iterate over
+ # recurse (bool): Whether to recurse into dependencies
+ #
+ def dependencies(self, targets, scope, *, recurse=True):
+ # Keep track of 'visited' in this scope, so that all targets
+ # share the same context.
+ visited = {}
- # Reset the element loader state
- Element._reset_load_state()
+ for target in targets:
+ for element in target.dependencies(scope, recurse=recurse, visited=visited):
+ yield element
+
+ # plan()
+ #
+ # Generator function to iterate over only the elements
+ # which are required to build the pipeline target, omitting
+ # cached elements. The elements are yielded in a depth sorted
+ # ordering for optimal build plans
+ #
+ # Args:
+ # elements (list of Element): List of target elements to plan
+ #
+ # Returns:
+ # (list of Element): A depth sorted list of the build plan
+ #
+ def plan(self, elements):
+ return _Planner().plan(elements)
# get_selection()
#
+ # Gets a full list of elements based on a toplevel
+ # list of element targets
+ #
# Args:
+ # targets (list of Element): The target Elements
# mode (PipelineSelection): The PipelineSelection mode
#
# Various commands define a --deps option to specify what elements to
# use in the result, this function reports a list that is appropriate for
# the selected option.
#
- def get_selection(self, mode):
+ def get_selection(self, targets, mode):
elements = None
if mode == PipelineSelection.NONE:
- elements = self.targets
+ elements = targets
elif mode == PipelineSelection.PLAN:
- elements = list(self._plan())
+ elements = self.plan(targets)
else:
if mode == PipelineSelection.ALL:
scope = Scope.ALL
@@ -193,49 +223,27 @@ class Pipeline():
elif mode == PipelineSelection.RUN:
scope = Scope.RUN
- elements = list(self.dependencies(scope))
-
- return self.remove_elements(elements)
-
- # dependencies()
- #
- # Generator function to iterate over elements and optionally
- # also iterate over sources.
- #
- # Args:
- # scope (Scope): The scope to iterate over
- # recurse (bool): Whether to recurse into dependencies
- # include_sources (bool): Whether to include element sources in iteration
- #
- def dependencies(self, scope, *, recurse=True, include_sources=False):
- # Keep track of 'visited' in this scope, so that all targets
- # share the same context.
- visited = {}
-
- for target in self.targets:
- for element in target.dependencies(scope, recurse=recurse, visited=visited):
- if include_sources:
- for source in element.sources():
- yield source
- yield element
+ elements = list(self.dependencies(targets, scope))
- #############################################################
- # Commands #
- #############################################################
+ return elements
- # remove_elements():
- #
- # Internal function
+ # except_elements():
#
# Return what we are left with after the intersection between
# excepted and target elements and their unique dependencies is
# gone.
#
# Args:
- # elements (list of elements): The list to remove elements from.
- def remove_elements(self, elements):
- targeted = list(self.dependencies(Scope.ALL))
-
+ # targets (list of Element): List of toplevel targetted elements
+ # elements (list of Element): The list to remove elements from
+ # except_targets (list of Element): List of toplevel except targets
+ #
+ # Returns:
+ # (list of Element): The elements list with the intersected
+ # exceptions removed
+ #
+ def except_elements(self, targets, elements, except_targets):
+ targeted = list(self.dependencies(targets, Scope.ALL))
visited = []
def find_intersection(element):
@@ -255,7 +263,7 @@ class Pipeline():
# elements that lie on the border closest to excepted elements
# between excepted and target elements.
intersection = list(itertools.chain.from_iterable(
- find_intersection(element) for element in self._exceptions
+ find_intersection(element) for element in except_targets
))
# Now use this set of elements to traverse the targeted
@@ -264,7 +272,7 @@ class Pipeline():
queue = []
visited = []
- queue.extend(self.targets)
+ queue.extend(targets)
while queue:
element = queue.pop()
if element in visited or element in intersection:
@@ -282,89 +290,77 @@ class Pipeline():
# in before.
return [element for element in elements if element in visited]
- #############################################################
- # Private Methods #
- #############################################################
-
- # _get_elements_to_track():
+ # targets_include()
+ #
+ # Checks whether the given targets are, or depend on some elements
#
- # Work out which elements are going to be tracked.
+ # Args:
+ # targets (list of Element): A list of targets
+ # elements (list of Element): List of elements to check
+ #
+ # Returns:
+ # (bool): True if all of `elements` are the `targets`, or are
+ # somehow depended on by `targets`.
#
- # Currently the 'mode' parameter only accepts
- # PipelineSelection.NONE or PipelineSelection.ALL
+ def targets_include(self, targets, elements):
+ target_element_set = set(self.dependencies(targets, Scope.ALL))
+ element_set = set(elements)
+ return element_set.issubset(target_element_set)
+
+ # subtract_elements()
#
- # This makes the assumption that the except elements are
- # meant to be removed from tracking element lists.
+ # Subtract a subset of elements
#
# Args:
- # track_targets (list of str): List of target names
- # mode (PipelineSelection): The PipelineSelection mode
+ # elements (list of Element): The element list
+ # subtract (list of Element): List of elements to subtract from elements
#
# Returns:
- # (list): List of Element objects to track
+ # (list): The original elements list, with elements in subtract removed
#
- def _get_elements_to_track(self, track_targets, mode=PipelineSelection.ALL):
- planner = _Planner()
-
- # Convert target names to elements
- track_elements = [e for e in self.dependencies(Scope.ALL)
- if e.name in track_targets]
-
- if mode != PipelineSelection.NONE:
- assert mode == PipelineSelection.ALL
-
- # Plan them out
- track_elements = planner.plan(track_elements, ignore_cache=True)
-
- # Filter out --except elements
- track_elements = self.remove_elements(track_elements)
-
- # Filter out cross junctioned elements
- if self._track_cross_junctions:
- self._assert_junction_tracking(track_elements)
- else:
- track_elements = self._filter_cross_junctions(track_elements)
-
- return track_elements
+ def subtract_elements(self, elements, subtract):
+ subtract_set = set(subtract)
+ return [
+ e for e in elements
+ if e not in subtract_set
+ ]
- # _prefilght()
+ # track_cross_junction_filter()
#
- # Preflights all the plugins in the pipeline
+ # Filters out elements which are across junction boundaries,
+ # otherwise asserts that there are no such elements.
#
- def _preflight(self):
- for element in self.dependencies(Scope.ALL):
- element._preflight()
-
- # _resolve_cache_keys()
+ # This is currently assumed to be only relevant for element
+ # lists targetted at tracking.
#
- # Initially resolve the cache keys
+ # Args:
+ # elements (list of Element): The list of elements to filter
+ # cross_junction_requested (bool): Whether the user requested
+ # cross junction tracking
#
- def _resolve_cache_keys(self):
- track_elements = set(self._track_elements)
-
- with self.context.timed_activity("Resolving cached state", silent_nested=True):
- for element in self.dependencies(Scope.ALL):
- if element in track_elements:
- # Load the pipeline in an explicitly inconsistent state, use
- # this for pipelines with tracking queues enabled.
- element._schedule_tracking()
+ # Returns:
+ # (list of Element): The filtered or asserted result
+ #
+ def track_cross_junction_filter(self, elements, cross_junction_requested):
+ # Filter out cross junctioned elements
+ if cross_junction_requested:
+ self._assert_junction_tracking(elements)
+ else:
+ elements = self._filter_cross_junctions(elements)
- # Determine initial element state. This may resolve cache keys
- # and interrogate the artifact cache.
- element._update_state()
+ return elements
- # _assert_consistent()
+ # assert_consistent()
#
- # Asserts that the pipeline is in a consistent state, that
- # is to say that all sources are consistent and can at least
- # be fetched.
+ # Asserts that the given list of elements are in a consistent state, that
+ # is to say that all sources are consistent and can at least be fetched.
#
# Consequently it also means that cache keys can be resolved.
#
- def _assert_consistent(self, toplevel):
+ def assert_consistent(self, elements):
inconsistent = []
- with self.context.timed_activity("Checking sources"):
- for element in toplevel:
+ with self._context.timed_activity("Checking sources"):
+ for element in elements:
if element._get_consistency() == Consistency.INCONSISTENT:
inconsistent.append(element)
@@ -375,6 +371,21 @@ class Pipeline():
detail += " " + element._get_full_name() + "\n"
raise PipelineError("Inconsistent pipeline", detail=detail, reason="inconsistent-pipeline")
+ # cleanup()
+ #
+ # Cleans up resources used by the Pipeline.
+ #
+ def cleanup(self):
+ if self._loader:
+ self._loader.cleanup()
+
+ # Reset the element loader state
+ Element._reset_load_state()
+
+ #############################################################
+ # Private Methods #
+ #############################################################
+
# _filter_cross_junction()
#
# Filters out cross junction elements from the elements
@@ -389,7 +400,7 @@ class Pipeline():
def _filter_cross_junctions(self, elements):
return [
element for element in elements
- if element._get_project() is self.project
+ if element._get_project() is self._project
]
# _assert_junction_tracking()
@@ -404,7 +415,7 @@ class Pipeline():
# We can track anything if the toplevel project uses project.refs
#
- if self.project.ref_storage == ProjectRefStorage.PROJECT_REFS:
+ if self._project.ref_storage == ProjectRefStorage.PROJECT_REFS:
return
# Ideally, we would want to report every cross junction element but not
@@ -414,37 +425,19 @@ class Pipeline():
# But this is too hard, lets shoot for a simple error.
for element in elements:
element_project = element._get_project()
- if element_project is not self.project:
+ if element_project is not self._project:
detail = "Requested to track sources across junction boundaries\n" + \
"in a project which does not use project.refs ref-storage."
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
- # _plan()
- #
- # Args:
- # except_ (bool): Whether to filter out the except elements from the plan
- #
- # Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
- def _plan(self, except_=True):
- build_plan = _Planner().plan(self.targets)
-
- if except_:
- build_plan = self.remove_elements(build_plan)
-
- for element in build_plan:
- yield element
-
# _message()
#
# Local message propagator
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self.context.message(
+ self._context.message(
Message(None, message_type, message, **args))
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 62d9f9804..c8d0bb69c 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -17,19 +17,22 @@
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
import os
import stat
import shlex
import shutil
import tarfile
+from contextlib import contextmanager
from tempfile import TemporaryDirectory
-from ._exceptions import StreamError, ImplError, BstError
+from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
from ._message import Message, MessageType
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._platform import Platform
-from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -41,25 +44,46 @@ from . import Scope, Consistency
# Args:
# context (Context): The Context object
# project (Project): The Project object
-# loaded_callback (callable): A callback to invoke when the pipeline is loaded
+# session_start (datetime): The time when the session started
+# session_start_callback (callable): A callback to invoke when the session starts
+# interrupt_callback (callable): A callback to invoke when we get interrupted
+# ticker_callback (callable): Invoked every second while running the scheduler
+# job_start_callback (callable): Called when a job starts
+# job_complete_callback (callable): Called when a job completes
#
class Stream():
- def __init__(self, context, project, loaded_callback):
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
-
- self._context = context
- self._project = project
- self._scheduler = None
- self._pipeline = None
+ def __init__(self, context, project, session_start, *,
+ session_start_callback=None,
+ interrupt_callback=None,
+ ticker_callback=None,
+ job_start_callback=None,
+ job_complete_callback=None):
- self._loaded_cb = loaded_callback
+ #
+ # Public members
+ #
+ self.targets = [] # Resolved target elements
+ self.session_elements = [] # List of elements being processed this session
+ self.total_elements = [] # Total list of elements based on targets
+ self.queues = [] # Queue objects
- # Load selected platform
+ #
+ # Private members
+ #
Platform.create_instance(context, project)
self._platform = Platform.get_platform()
self._artifacts = self._platform.artifactcache
+ self._context = context
+ self._project = project
+ self._pipeline = Pipeline(context, project, self._artifacts)
+ self._scheduler = Scheduler(context, session_start,
+ interrupt_callback=interrupt_callback,
+ ticker_callback=ticker_callback,
+ job_start_callback=job_start_callback,
+ job_complete_callback=job_complete_callback)
+ self._first_non_track_queue = None
+ self._session_start_callback = session_start_callback
# cleanup()
#
@@ -81,14 +105,18 @@ class Stream():
# except_targets (list of str): Specified targets to except from fetching
# downloadable (bool): Whether the downloadable state of elements should be resolved
#
+ # Returns:
+ # (list of Element): The selected elements
def load_selection(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=(),
downloadable=False):
- self.init_pipeline(targets, except_=except_targets,
- use_configured_remote_caches=downloadable,
- fetch_subprojects=False)
- return self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ except_targets=except_targets,
+ use_artifact_config=downloadable,
+ fetch_subprojects=False)
+ return elements
# shell()
#
@@ -118,7 +146,7 @@ class Stream():
if directory is None:
missing_deps = [
dep._get_full_name()
- for dep in self._pipeline.dependencies(scope)
+ for dep in self._pipeline.dependencies([element], scope)
if not dep._cached()
]
if missing_deps:
@@ -145,66 +173,47 @@ class Stream():
track_cross_junctions=False,
build_all=False):
- rewritable = False
- if track_targets:
- rewritable = True
+ if build_all or track_targets:
+ selection = PipelineSelection.ALL
+ else:
+ selection = PipelineSelection.PLAN
- self.init_pipeline(targets,
- except_=track_except,
- rewritable=rewritable,
- use_configured_remote_caches=True,
- track_elements=track_targets,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=PipelineSelection.ALL,
+ track_except_targets=track_except,
+ track_cross_junctions=track_cross_junctions,
+ use_artifact_config=True,
+ fetch_subprojects=True)
- if build_all:
- plan = self._pipeline.dependencies(Scope.ALL)
- else:
- plan = self._pipeline._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._pipeline._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._pipeline._assert_consistent(plan)
-
- fetch = FetchQueue(self._scheduler, skip_cached=True)
- build = BuildQueue(self._scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- queues.append(track)
- if self._pipeline._artifacts.has_fetch_remotes():
- pull = PullQueue(self._scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._pipeline._artifacts.has_push_remotes():
- push = PushQueue(self._scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._pipeline._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
+ # Remove the tracking elements from the main targets
+ elements = self._pipeline.subtract_elements(elements, track_elements)
- self.session_elements = len(self._pipeline._track_elements) + len(plan)
+ # Assert that the elements we're not going to track are consistent
+ self._pipeline.assert_consistent(elements)
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Now construct the queues
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+
+ if self._artifacts.has_fetch_remotes():
+ self._add_queue(PullQueue(self._scheduler))
+
+ self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
+ self._add_queue(BuildQueue(self._scheduler))
+
+ if self._artifacts.has_push_remotes():
+ self._add_queue(PushQueue(self._scheduler))
+
+ # Enqueue elements
+ #
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(elements)
+ self._run()
# fetch()
#
@@ -223,21 +232,25 @@ class Stream():
track_targets=False,
track_cross_junctions=False):
- rewritable = False
if track_targets:
- rewritable = True
-
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=rewritable,
- track_elements=targets if track_targets else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ track_targets = targets
+ track_selection = selection
+ track_except_targets = except_targets
+ else:
+ track_targets = ()
+ track_selection = PipelineSelection.NONE
+ track_except_targets = ()
- fetch_plan = self._pipeline.get_selection(selection)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=track_selection,
+ except_targets=except_targets,
+ track_except_targets=track_except_targets,
+ track_cross_junctions=track_cross_junctions,
+ fetch_subprojects=True)
- # Delegated to a shared method for now
- self._do_fetch(fetch_plan)
+ # Delegated to a shared fetch method
+ self._fetch(elements, track_elements=track_elements)
# track()
#
@@ -255,26 +268,20 @@ class Stream():
def track(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=None,
- track_targets=False,
cross_junctions=False):
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=True,
- track_elements=targets,
- track_cross_junctions=cross_junctions,
- track_selection=selection,
- fetch_subprojects=True)
+ _, elements = \
+ self._load(targets, targets,
+ selection=selection, track_selection=selection,
+ except_targets=except_targets,
+ track_except_targets=except_targets,
+ track_cross_junctions=cross_junctions,
+ fetch_subprojects=True)
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._enqueue_plan(elements, queue=track_queue)
+ self._run()
# pull()
#
@@ -292,33 +299,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
+ use_config = True
+ if remote:
+ use_config = False
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- elements = self._pipeline.get_selection(selection)
-
- if not self._pipeline._artifacts.has_fetch_remotes():
+ if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- pull = PullQueue(self._scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# push()
#
@@ -336,33 +333,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
-
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ use_config = True
+ if remote:
+ use_config = False
- elements = self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- if not self._pipeline._artifacts.has_push_remotes():
+ if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- push = PushQueue(self._scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PushQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# checkout()
#
@@ -382,10 +369,9 @@ class Stream():
integrate=True,
hardlinks=False):
- self.init_pipeline((target,), fetch_subprojects=True)
-
# We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ elements, _ = self._load((target,), (), fetch_subprojects=True)
+ target = elements[0]
try:
os.makedirs(directory, exist_ok=True)
@@ -433,13 +419,13 @@ class Stream():
track_first,
force):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
- target = self._pipeline.targets[0]
+ elements, track_elements = self._load((target,), track_targets)
+ target = elements[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -459,11 +445,11 @@ class Stream():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
- # For now, tracking is handled by _do_fetch() automatically
- # by virtue of our screwed up pipeline initialization stuff.
- #
if not no_checkout or track_first:
- self._do_fetch([target])
+ track_elements = []
+ if track_first:
+ track_elements = elements
+ self._fetch(elements, track_elements=track_elements)
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -522,34 +508,35 @@ class Stream():
#
def workspace_reset(self, targets, *, track_first):
- self.init_pipeline(targets,
- track_elements=targets if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = targets
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load(targets, track_targets)
# Do the tracking first
if track_first:
- self._do_fetch(self._pipeline.targets)
+ self._fetch(elements, track_elements=track_elements)
- for target in self._pipeline.targets:
- workspace = self._project.workspaces.get_workspace(target.name)
+ for element in elements:
+ workspace = self._project.workspaces.get_workspace(element.name)
- with target.timed_activity("Removing workspace directory {}"
- .format(workspace.path)):
+ with element.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
try:
shutil.rmtree(workspace.path)
except OSError as e:
raise StreamError("Could not remove '{}': {}"
.format(workspace.path, e)) from e
- self._project.workspaces.delete_workspace(target.name)
- self._project.workspaces.create_workspace(target.name, workspace.path)
+ self._project.workspaces.delete_workspace(element.name)
+ self._project.workspaces.create_workspace(element.name, workspace.path)
- with target.timed_activity("Staging sources to {}".format(workspace.path)):
- target._open_workspace()
+ with element.timed_activity("Staging sources to {}".format(workspace.path)):
+ element._open_workspace()
- self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
self._project.workspaces.save_config()
@@ -609,15 +596,20 @@ class Stream():
force=False,
compression="gz"):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=True)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load((target,), track_targets,
+ selection=PipelineSelection.ALL,
+ track_selection=PipelineSelection.ALL,
+ fetch_subprojects=True)
# source-bundle only supports one target
- target = self._pipeline.targets[0]
- dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
+ target = self.targets[0]
+
+ self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -635,14 +627,15 @@ class Stream():
raise StreamError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
- plan = list(dependencies)
- self._do_fetch(plan)
+ # Fetch and possibly track first
+ #
+ self._fetch(elements, track_elements=track_elements)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
+ builddir = self._context.builddir
prefix = "{}-".format(target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
@@ -655,18 +648,162 @@ class Stream():
# Any elements that don't implement _write_script
# should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
+ elements = [
+ element for element in elements
+ if self._write_element_script(source_directory, element)
+ ]
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
+ self._write_element_sources(tempdir, elements)
+ self._write_build_script(tempdir, elements)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
#############################################################
- # Private Methods #
+ # Scheduler API forwarding #
+ #############################################################
+
+ # running
+ #
+ # Whether the scheduler is running
+ #
+ @property
+ def running(self):
+ return self._scheduler.loop is not None
+
+ # suspended
+ #
+ # Whether the scheduler is currently suspended
+ #
+ @property
+ def suspended(self):
+ return self._scheduler.suspended
+
+ # terminated
+ #
+ # Whether the scheduler is currently terminated
+ #
+ @property
+ def terminated(self):
+ return self._scheduler.terminated
+
+ # elapsed_time
+ #
+ # Elapsed time since the session start
+ #
+ @property
+ def elapsed_time(self):
+ return self._scheduler.elapsed_time()
+
+ # terminate()
+ #
+ # Terminate jobs
+ #
+ def terminate(self):
+ self._scheduler.terminate_jobs()
+
+ # quit()
+ #
+ # Quit the session, this will continue with any ongoing
+ # jobs, use Stream.terminate() instead for cancellation
+ # of ongoing jobs
+ #
+ def quit(self):
+ self._scheduler.stop_queueing()
+
+ # suspend()
+ #
+ # Context manager to suspend ongoing jobs
+ #
+ @contextmanager
+ def suspend(self):
+ with self._scheduler.jobs_suspended():
+ yield
+
+ #############################################################
+ # Private Methods #
#############################################################
+ # _load()
+ #
+ # A convenience method for loading element lists
+ #
+ # Args:
+ # targets (list of str): Main targets to load
+ # track_targets (list of str): Tracking targets
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # track_selection (PipelineSelection): The selection mode for the specified tracking targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_except_targets (list of str): Specified targets to except from fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
+ # use_artifact_config (bool): Whether to initialize artifacts with the config
+ # artifact_remote_url (bool): A remote url for initializing the artifacts
+ # fetch_subprojects (bool): Whether to fetch subprojects while loading
+ #
+ # Returns:
+ # (list of Element): The primary element selection
+ # (list of Element): The tracking element selection
+ #
+ def _load(self, targets, track_targets, *,
+ selection=PipelineSelection.NONE,
+ track_selection=PipelineSelection.NONE,
+ except_targets=(),
+ track_except_targets=(),
+ track_cross_junctions=False,
+ use_artifact_config=False,
+ artifact_remote_url=None,
+ fetch_subprojects=False):
+
+ # Load rewritable if we have any tracking selection to make
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ # Load all targets
+ elements, except_elements, track_elements, track_except_elements = \
+ self._pipeline.load([targets, except_targets, track_targets, track_except_targets],
+ rewritable=rewritable,
+ fetch_subprojects=fetch_subprojects)
+
+ # Hold on to the targets
+ self.targets = elements
+
+ # Here we should raise an error if the track_elements targets
+ # are not dependencies of the primary targets, this is not
+ # supported.
+ #
+ # This can happen with `bst build --track`
+ #
+ if not self._pipeline.targets_include(elements, track_elements):
+ raise StreamError("Specified tracking targets that are not "
+ "within the scope of primary targets")
+
+ # First take care of marking tracking elements, this must be
+ # done before resolving element states.
+ #
+ assert track_selection != PipelineSelection.PLAN
+ track_selected = self._pipeline.get_selection(track_elements, track_selection)
+ track_selected = self._pipeline.except_elements(track_elements,
+ track_selected,
+ track_except_elements)
+ track_selected = self._pipeline.track_cross_junction_filter(track_selected,
+ track_cross_junctions)
+
+ for element in track_selected:
+ element._schedule_tracking()
+
+ # Connect to remote caches, this needs to be done before resolving element state
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
+
+ # Now move on to loading primary selection.
+ #
+ self._pipeline.resolve_elements(elements)
+ selected = self._pipeline.get_selection(elements, selection)
+ selected = self._pipeline.except_elements(elements,
+ selected,
+ except_elements)
+
+ return selected, track_selected
+
# _message()
#
# Local message propagator
@@ -676,47 +813,103 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
- # _do_fetch()
+ # _add_queue()
+ #
+ # Adds a queue to the stream
+ #
+ # Args:
+ # queue (Queue): Queue to add to the pipeline
+ # track (bool): Whether this is the tracking queue
+ #
+ def _add_queue(self, queue, *, track=False):
+ self.queues.append(queue)
+
+ if not (track or self._first_non_track_queue):
+ self._first_non_track_queue = queue
+
+ # _enqueue_plan()
+ #
+ # Enqueues planned elements to the specified queue.
+ #
+ # Args:
+ # plan (list of Element): The list of elements to be enqueued
+ # queue (Queue): The target queue, defaults to the first non-track queue
+ #
+ def _enqueue_plan(self, plan, *, queue=None):
+ queue = queue or self._first_non_track_queue
+
+ queue.enqueue(plan)
+ self.session_elements += plan
+
+ # _run()
+ #
+ # Common function for running the scheduler
+ #
+ def _run(self):
+
+ # Inform the frontend of the full list of elements
+ # and the list of elements which will be processed in this run
+ #
+ self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
+
+ if self._session_start_callback is not None:
+ self._session_start_callback()
+
+ _, status = self._scheduler.run(self.queues)
+
+ # Force update element states after a run, such that the summary
+ # is more coherent
+ try:
+ for element in self.total_elements:
+ element._update_state()
+ except BstError as e:
+ self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ set_last_task_error(e.domain, e.reason)
+ except Exception as e: # pylint: disable=broad-except
+ self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
+ # _fetch()
#
# Performs the fetch job, the body of this function is here because
# it is shared between a few internals.
#
# Args:
# elements (list of Element): Elements to fetch
+ # track_elements (list of Element): Elements to track
#
- def _do_fetch(self, elements):
+ def _fetch(self, elements, *, track_elements=None):
- fetch_plan = elements
+ if track_elements is None:
+ track_elements = []
# Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
+ fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
# Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
+ self._pipeline.assert_consistent(fetch_plan)
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+ fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
+ # Construct queues, enqueue and run
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._add_queue(FetchQueue(self._scheduler))
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(fetch_plan)
+ self._run()
# Helper function for checkout()
#
@@ -772,7 +965,7 @@ class Stream():
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
- with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ with self._context.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
@@ -780,64 +973,3 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
-
- #############################################################
- # TEMPORARY CRAP #
- #############################################################
-
- # init_pipeline()
- #
- # Initialize the pipeline for a given activity
- #
- # Args:
- # elements (list of elements): The elements to load recursively
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
- # fetch_subprojects (bool): Whether to fetch subprojects while loading
- #
- # Note that the except_ argument may have a subtly different meaning depending
- # on the activity performed on the Pipeline. In normal circumstances the except_
- # argument excludes elements from the `elements` list. In a build session, the
- # except_ elements are excluded from the tracking plan.
- #
- def init_pipeline(self, elements, *,
- except_=tuple(),
- rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
- fetch_subprojects=True):
-
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- self._pipeline = Pipeline(self._context, self._project, self._artifacts,
- elements, except_,
- rewritable=rewritable,
- fetch_subprojects=fetch_subprojects)
-
- # After loading the projects, but before resolving cache keys,
- # we need to initialize remote artifact caches where relevant
- #
- self._artifacts.setup_remotes(use_config=use_configured_remote_caches,
- remote_url=add_remote_cache)
-
- # Now complete the initialization
- #
- self._pipeline.initialize(track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- # Get the total
- self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
-
- if self._loaded_cb is not None:
- self._loaded_cb(self._pipeline)
diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py
index 84d543e52..3f0a3adbe 100644
--- a/tests/frontend/buildtrack.py
+++ b/tests/frontend/buildtrack.py
@@ -31,40 +31,24 @@ def create_element(repo, name, path, dependencies, ref=None):
@pytest.mark.datafiles(os.path.join(DATA_DIR))
@pytest.mark.parametrize("ref_storage", [('inline'), ('project.refs')])
-@pytest.mark.parametrize("exceptions,excepted", [
+@pytest.mark.parametrize("track_targets,exceptions,tracked", [
# Test with no exceptions
- ([], []),
+ (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
+ (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']),
+ (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
# Test excepting '2.bst'
- (['2.bst'], ['2.bst', '7.bst']),
+ (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']),
+ (['3.bst'], ['2.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']),
# Test excepting '2.bst' and '3.bst'
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
+ (['0.bst'], ['2.bst', '3.bst'], ['0.bst']),
+ (['3.bst'], ['2.bst', '3.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst', '3.bst'], [])
])
-@pytest.mark.parametrize("track_targets,tracked", [
- # Test tracking the main target element
- (['0.bst'], [
- '0.bst', '2.bst', '3.bst',
- '4.bst', '5.bst', '6.bst', '7.bst'
- ]),
-
- # Test tracking a child element
- (['3.bst'], [
- '3.bst', '4.bst', '5.bst',
- '6.bst'
- ]),
-
- # Test tracking multiple children
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
-])
-def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
- exceptions, tracked, excepted):
+def test_build_track(cli, datafiles, tmpdir, ref_storage,
+ track_targets, exceptions, tracked):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
@@ -102,7 +86,7 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
for element, dependencies in create_elements.items():
# Test the element inconsistency resolution by ensuring that
# only elements that aren't tracked have refs
- if element in set(tracked) - set(excepted):
+ if element in set(tracked):
# Elements which should not have a ref set
#
create_element(repo, element, element_path, dependencies)
@@ -133,14 +117,14 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
result = cli.run(project=project, silent=True, args=args)
tracked_elements = result.get_tracked_elements()
- assert set(tracked_elements) == set(tracked) - set(excepted)
+ assert set(tracked_elements) == set(tracked)
# Delete element sources
source_dir = os.path.join(project, 'cache', 'sources')
shutil.rmtree(source_dir)
# Delete artifacts one by one and assert element states
- for target in set(tracked) - set(excepted):
+ for target in set(tracked):
cli.remove_artifact_from_cache(project, target)
# Assert that it's tracked
@@ -154,40 +138,24 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, track_targets,
@pytest.mark.datafiles(os.path.join(DATA_DIR))
-@pytest.mark.parametrize("exceptions,excepted", [
+@pytest.mark.parametrize("track_targets,exceptions,tracked", [
# Test with no exceptions
- ([], []),
+ (['0.bst'], [], ['0.bst', '2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
+ (['3.bst'], [], ['3.bst', '4.bst', '5.bst', '6.bst']),
+ (['2.bst', '3.bst'], [], ['2.bst', '3.bst', '4.bst', '5.bst', '6.bst', '7.bst']),
# Test excepting '2.bst'
- (['2.bst'], ['2.bst', '7.bst']),
+ (['0.bst'], ['2.bst'], ['0.bst', '3.bst', '4.bst', '5.bst', '6.bst']),
+ (['3.bst'], ['2.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst'], ['3.bst', '4.bst', '5.bst', '6.bst']),
# Test excepting '2.bst' and '3.bst'
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
-])
-@pytest.mark.parametrize("track_targets,tracked", [
- # Test tracking the main target element
- (['0.bst'], [
- '0.bst', '2.bst', '3.bst',
- '4.bst', '5.bst', '6.bst', '7.bst'
- ]),
-
- # Test tracking a child element
- (['3.bst'], [
- '3.bst', '4.bst', '5.bst',
- '6.bst'
- ]),
-
- # Test tracking multiple children
- (['2.bst', '3.bst'], [
- '2.bst', '3.bst', '4.bst',
- '5.bst', '6.bst', '7.bst'
- ])
+ (['0.bst'], ['2.bst', '3.bst'], ['0.bst']),
+ (['3.bst'], ['2.bst', '3.bst'], []),
+ (['2.bst', '3.bst'], ['2.bst', '3.bst'], [])
])
def test_build_track_update(cli, datafiles, tmpdir, track_targets,
- exceptions, tracked, excepted):
+ exceptions, tracked):
project = os.path.join(datafiles.dirname, datafiles.basename)
dev_files_path = os.path.join(project, 'files', 'dev-files')
element_path = os.path.join(project, 'elements')
@@ -231,7 +199,7 @@ def test_build_track_update(cli, datafiles, tmpdir, track_targets,
result = cli.run(project=project, silent=True, args=args)
tracked_elements = result.get_tracked_elements()
- assert set(tracked_elements) == set(tracked) - set(excepted)
+ assert set(tracked_elements) == set(tracked)
@pytest.mark.datafiles(os.path.join(DATA_DIR))
diff --git a/tests/plugins/pipeline.py b/tests/plugins/pipeline.py
index db683094b..65929cf50 100644
--- a/tests/plugins/pipeline.py
+++ b/tests/plugins/pipeline.py
@@ -23,23 +23,25 @@ def create_pipeline(tmpdir, basedir, target):
context.set_message_handler(dummy_handler)
- return Pipeline(context, project, None, [target], [])
+ pipeline = Pipeline(context, project, None)
+ targets, = pipeline.load([(target,)])
+ return targets
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customsource'))
def test_customsource(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
- assert(pipeline.targets[0].get_kind() == "autotools")
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
+ assert(targets[0].get_kind() == "autotools")
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'customelement'))
def test_customelement(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
- assert(pipeline.targets[0].get_kind() == "foo")
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
+ assert(targets[0].get_kind() == "foo")
@pytest.mark.datafiles(os.path.join(DATA_DIR, 'badversionsource'))
@@ -47,7 +49,7 @@ def test_badversionsource(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
with pytest.raises(LoadError) as exc:
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN
@@ -57,6 +59,6 @@ def test_badversionelement(datafiles, tmpdir):
basedir = os.path.join(datafiles.dirname, datafiles.basename)
with pytest.raises(LoadError) as exc:
- pipeline = create_pipeline(tmpdir, basedir, 'simple.bst')
+ targets = create_pipeline(tmpdir, basedir, 'simple.bst')
assert exc.value.reason == LoadErrorReason.UNSUPPORTED_PLUGIN