summaryrefslogtreecommitdiff
path: root/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'buildstream/_stream.py')
-rw-r--r--buildstream/_stream.py688
1 files changed, 410 insertions, 278 deletions
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 62d9f9804..c8d0bb69c 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -17,19 +17,22 @@
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
+# Jürg Billeter <juerg.billeter@codethink.co.uk>
+# Tristan Maat <tristan.maat@codethink.co.uk>
+
import os
import stat
import shlex
import shutil
import tarfile
+from contextlib import contextmanager
from tempfile import TemporaryDirectory
-from ._exceptions import StreamError, ImplError, BstError
+from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
from ._message import Message, MessageType
-from ._scheduler import SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
+from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._platform import Platform
-from ._profile import Topics, profile_start, profile_end
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -41,25 +44,46 @@ from . import Scope, Consistency
# Args:
# context (Context): The Context object
# project (Project): The Project object
-# loaded_callback (callable): A callback to invoke when the pipeline is loaded
+# session_start (datetime): The time when the session started
+# session_start_callback (callable): A callback to invoke when the session starts
+# interrupt_callback (callable): A callback to invoke when we get interrupted
+# ticker_callback (callable): Invoked every second while running the scheduler
+# job_start_callback (callable): Called when a job starts
+# job_complete_callback (callable): Called when a job completes
#
class Stream():
- def __init__(self, context, project, loaded_callback):
- self.session_elements = 0 # Number of elements to process in this session
- self.total_elements = 0 # Number of total potential elements for this pipeline
-
- self._context = context
- self._project = project
- self._scheduler = None
- self._pipeline = None
+ def __init__(self, context, project, session_start, *,
+ session_start_callback=None,
+ interrupt_callback=None,
+ ticker_callback=None,
+ job_start_callback=None,
+ job_complete_callback=None):
- self._loaded_cb = loaded_callback
+ #
+ # Public members
+ #
+ self.targets = [] # Resolved target elements
+ self.session_elements = [] # List of elements being processed this session
+ self.total_elements = [] # Total list of elements based on targets
+ self.queues = [] # Queue objects
- # Load selected platform
+ #
+ # Private members
+ #
Platform.create_instance(context, project)
self._platform = Platform.get_platform()
self._artifacts = self._platform.artifactcache
+ self._context = context
+ self._project = project
+ self._pipeline = Pipeline(context, project, self._artifacts)
+ self._scheduler = Scheduler(context, session_start,
+ interrupt_callback=interrupt_callback,
+ ticker_callback=ticker_callback,
+ job_start_callback=job_start_callback,
+ job_complete_callback=job_complete_callback)
+ self._first_non_track_queue = None
+ self._session_start_callback = session_start_callback
# cleanup()
#
@@ -81,14 +105,18 @@ class Stream():
# except_targets (list of str): Specified targets to except from fetching
# downloadable (bool): Whether the downloadable state of elements should be resolved
#
+ # Returns:
+ # (list of Element): The selected elements
def load_selection(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=(),
downloadable=False):
- self.init_pipeline(targets, except_=except_targets,
- use_configured_remote_caches=downloadable,
- fetch_subprojects=False)
- return self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ except_targets=except_targets,
+ use_artifact_config=downloadable,
+ fetch_subprojects=False)
+ return elements
# shell()
#
@@ -118,7 +146,7 @@ class Stream():
if directory is None:
missing_deps = [
dep._get_full_name()
- for dep in self._pipeline.dependencies(scope)
+ for dep in self._pipeline.dependencies([element], scope)
if not dep._cached()
]
if missing_deps:
@@ -145,66 +173,47 @@ class Stream():
track_cross_junctions=False,
build_all=False):
- rewritable = False
- if track_targets:
- rewritable = True
+ if build_all or track_targets:
+ selection = PipelineSelection.ALL
+ else:
+ selection = PipelineSelection.PLAN
- self.init_pipeline(targets,
- except_=track_except,
- rewritable=rewritable,
- use_configured_remote_caches=True,
- track_elements=track_targets,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=PipelineSelection.ALL,
+ track_except_targets=track_except,
+ track_cross_junctions=track_cross_junctions,
+ use_artifact_config=True,
+ fetch_subprojects=True)
- if build_all:
- plan = self._pipeline.dependencies(Scope.ALL)
- else:
- plan = self._pipeline._plan(except_=False)
-
- # We want to start the build queue with any elements that are
- # not being tracked first
- track_elements = set(self._pipeline._track_elements)
- plan = [e for e in plan if e not in track_elements]
-
- # Assert that we have a consistent pipeline now (elements in
- # track_plan will be made consistent)
- self._pipeline._assert_consistent(plan)
-
- fetch = FetchQueue(self._scheduler, skip_cached=True)
- build = BuildQueue(self._scheduler)
- track = None
- pull = None
- push = None
- queues = []
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- queues.append(track)
- if self._pipeline._artifacts.has_fetch_remotes():
- pull = PullQueue(self._scheduler)
- queues.append(pull)
- queues.append(fetch)
- queues.append(build)
- if self._pipeline._artifacts.has_push_remotes():
- push = PushQueue(self._scheduler)
- queues.append(push)
-
- # If we're going to track, tracking elements go into the first queue
- # which is the tracking queue, the rest of the plan goes into the next
- # queue (whatever that happens to be)
- if track:
- queues[0].enqueue(self._pipeline._track_elements)
- queues[1].enqueue(plan)
- else:
- queues[0].enqueue(plan)
+ # Remove the tracking elements from the main targets
+ elements = self._pipeline.subtract_elements(elements, track_elements)
- self.session_elements = len(self._pipeline._track_elements) + len(plan)
+ # Assert that the elements we're not going to track are consistent
+ self._pipeline.assert_consistent(elements)
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ # Now construct the queues
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+
+ if self._artifacts.has_fetch_remotes():
+ self._add_queue(PullQueue(self._scheduler))
+
+ self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
+ self._add_queue(BuildQueue(self._scheduler))
+
+ if self._artifacts.has_push_remotes():
+ self._add_queue(PushQueue(self._scheduler))
+
+ # Enqueue elements
+ #
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(elements)
+ self._run()
# fetch()
#
@@ -223,21 +232,25 @@ class Stream():
track_targets=False,
track_cross_junctions=False):
- rewritable = False
if track_targets:
- rewritable = True
-
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=rewritable,
- track_elements=targets if track_targets else None,
- track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True)
+ track_targets = targets
+ track_selection = selection
+ track_except_targets = except_targets
+ else:
+ track_targets = ()
+ track_selection = PipelineSelection.NONE
+ track_except_targets = ()
- fetch_plan = self._pipeline.get_selection(selection)
+ elements, track_elements = \
+ self._load(targets, track_targets,
+ selection=selection, track_selection=track_selection,
+ except_targets=except_targets,
+ track_except_targets=track_except_targets,
+ track_cross_junctions=track_cross_junctions,
+ fetch_subprojects=True)
- # Delegated to a shared method for now
- self._do_fetch(fetch_plan)
+ # Delegated to a shared fetch method
+ self._fetch(elements, track_elements=track_elements)
# track()
#
@@ -255,26 +268,20 @@ class Stream():
def track(self, targets, *,
selection=PipelineSelection.NONE,
except_targets=None,
- track_targets=False,
cross_junctions=False):
- self.init_pipeline(targets,
- except_=except_targets,
- rewritable=True,
- track_elements=targets,
- track_cross_junctions=cross_junctions,
- track_selection=selection,
- fetch_subprojects=True)
+ _, elements = \
+ self._load(targets, targets,
+ selection=selection, track_selection=selection,
+ except_targets=except_targets,
+ track_except_targets=except_targets,
+ track_cross_junctions=cross_junctions,
+ fetch_subprojects=True)
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- self.session_elements = len(self._pipeline._track_elements)
-
- _, status = self._scheduler.run([track])
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._enqueue_plan(elements, queue=track_queue)
+ self._run()
# pull()
#
@@ -292,33 +299,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
+ use_config = True
+ if remote:
+ use_config = False
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- elements = self._pipeline.get_selection(selection)
-
- if not self._pipeline._artifacts.has_fetch_remotes():
+ if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- pull = PullQueue(self._scheduler)
- pull.enqueue(plan)
- queues = [pull]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PullQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# push()
#
@@ -336,33 +333,23 @@ class Stream():
selection=PipelineSelection.NONE,
remote=None):
- use_configured_remote_caches = True
- if remote is not None:
- use_configured_remote_caches = False
-
- self.init_pipeline(targets,
- use_configured_remote_caches=use_configured_remote_caches,
- add_remote_cache=remote,
- fetch_subprojects=True)
+ use_config = True
+ if remote:
+ use_config = False
- elements = self._pipeline.get_selection(selection)
+ elements, _ = self._load(targets, (),
+ selection=selection,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ fetch_subprojects=True)
- if not self._pipeline._artifacts.has_push_remotes():
+ if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
- plan = elements
- self._pipeline._assert_consistent(plan)
- self._pipeline.session_elements = len(plan)
-
- push = PushQueue(self._scheduler)
- push.enqueue(plan)
- queues = [push]
-
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ self._pipeline.assert_consistent(elements)
+ self._add_queue(PushQueue(self._scheduler))
+ self._enqueue_plan(elements)
+ self._run()
# checkout()
#
@@ -382,10 +369,9 @@ class Stream():
integrate=True,
hardlinks=False):
- self.init_pipeline((target,), fetch_subprojects=True)
-
# We only have one target in a checkout command
- target = self._pipeline.targets[0]
+ elements, _ = self._load((target,), (), fetch_subprojects=True)
+ target = elements[0]
try:
os.makedirs(directory, exist_ok=True)
@@ -433,13 +419,13 @@ class Stream():
track_first,
force):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
- target = self._pipeline.targets[0]
+ elements, track_elements = self._load((target,), track_targets)
+ target = elements[0]
workdir = os.path.abspath(directory)
if not list(target.sources()):
@@ -459,11 +445,11 @@ class Stream():
# If we're going to checkout, we need at least a fetch,
# if we were asked to track first, we're going to fetch anyway.
#
- # For now, tracking is handled by _do_fetch() automatically
- # by virtue of our screwed up pipeline initialization stuff.
- #
if not no_checkout or track_first:
- self._do_fetch([target])
+ track_elements = []
+ if track_first:
+ track_elements = elements
+ self._fetch(elements, track_elements=track_elements)
if not no_checkout and target._get_consistency() != Consistency.CACHED:
raise StreamError("Could not stage uncached source. " +
@@ -522,34 +508,35 @@ class Stream():
#
def workspace_reset(self, targets, *, track_first):
- self.init_pipeline(targets,
- track_elements=targets if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=False)
+ if track_first:
+ track_targets = targets
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load(targets, track_targets)
# Do the tracking first
if track_first:
- self._do_fetch(self._pipeline.targets)
+ self._fetch(elements, track_elements=track_elements)
- for target in self._pipeline.targets:
- workspace = self._project.workspaces.get_workspace(target.name)
+ for element in elements:
+ workspace = self._project.workspaces.get_workspace(element.name)
- with target.timed_activity("Removing workspace directory {}"
- .format(workspace.path)):
+ with element.timed_activity("Removing workspace directory {}"
+ .format(workspace.path)):
try:
shutil.rmtree(workspace.path)
except OSError as e:
raise StreamError("Could not remove '{}': {}"
.format(workspace.path, e)) from e
- self._project.workspaces.delete_workspace(target.name)
- self._project.workspaces.create_workspace(target.name, workspace.path)
+ self._project.workspaces.delete_workspace(element.name)
+ self._project.workspaces.create_workspace(element.name, workspace.path)
- with target.timed_activity("Staging sources to {}".format(workspace.path)):
- target._open_workspace()
+ with element.timed_activity("Staging sources to {}".format(workspace.path)):
+ element._open_workspace()
- self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(target.name, workspace.path))
+ self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
self._project.workspaces.save_config()
@@ -609,15 +596,20 @@ class Stream():
force=False,
compression="gz"):
- self.init_pipeline((target,),
- track_elements=[target] if track_first else None,
- track_selection=PipelineSelection.NONE,
- rewritable=track_first,
- fetch_subprojects=True)
+ if track_first:
+ track_targets = (target,)
+ else:
+ track_targets = ()
+
+ elements, track_elements = self._load((target,), track_targets,
+ selection=PipelineSelection.ALL,
+ track_selection=PipelineSelection.ALL,
+ fetch_subprojects=True)
# source-bundle only supports one target
- target = self._pipeline.targets[0]
- dependencies = self._pipeline.get_selection(PipelineSelection.ALL)
+ target = self.targets[0]
+
+ self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -635,14 +627,15 @@ class Stream():
raise StreamError("Cannot write to {0}: {1}"
.format(tar_location, e)) from e
- plan = list(dependencies)
- self._do_fetch(plan)
+ # Fetch and possibly track first
+ #
+ self._fetch(elements, track_elements=track_elements)
# We don't use the scheduler for this as it is almost entirely IO
# bound.
# Create a temporary directory to build the source tree in
- builddir = target._get_context().builddir
+ builddir = self._context.builddir
prefix = "{}-".format(target.normal_name)
with TemporaryDirectory(prefix=prefix, dir=builddir) as tempdir:
@@ -655,18 +648,162 @@ class Stream():
# Any elements that don't implement _write_script
# should not be included in the later stages.
- plan = [element for element in plan
- if self._write_element_script(source_directory, element)]
+ elements = [
+ element for element in elements
+ if self._write_element_script(source_directory, element)
+ ]
- self._write_element_sources(tempdir, plan)
- self._write_build_script(tempdir, plan)
+ self._write_element_sources(tempdir, elements)
+ self._write_build_script(tempdir, elements)
self._collect_sources(tempdir, tar_location,
target.normal_name, compression)
#############################################################
- # Private Methods #
+ # Scheduler API forwarding #
+ #############################################################
+
+ # running
+ #
+ # Whether the scheduler is running
+ #
+ @property
+ def running(self):
+ return self._scheduler.loop is not None
+
+ # suspended
+ #
+ # Whether the scheduler is currently suspended
+ #
+ @property
+ def suspended(self):
+ return self._scheduler.suspended
+
+ # terminated
+ #
+ # Whether the scheduler is currently terminated
+ #
+ @property
+ def terminated(self):
+ return self._scheduler.terminated
+
+ # elapsed_time
+ #
+ # Elapsed time since the session start
+ #
+ @property
+ def elapsed_time(self):
+ return self._scheduler.elapsed_time()
+
+ # terminate()
+ #
+ # Terminate jobs
+ #
+ def terminate(self):
+ self._scheduler.terminate_jobs()
+
+ # quit()
+ #
+ # Quit the session, this will continue with any ongoing
+ # jobs, use Stream.terminate() instead for cancellation
+ # of ongoing jobs
+ #
+ def quit(self):
+ self._scheduler.stop_queueing()
+
+ # suspend()
+ #
+ # Context manager to suspend ongoing jobs
+ #
+ @contextmanager
+ def suspend(self):
+ with self._scheduler.jobs_suspended():
+ yield
+
+ #############################################################
+ # Private Methods #
#############################################################
+ # _load()
+ #
+ # A convenience method for loading element lists
+ #
+ # Args:
+ # targets (list of str): Main targets to load
+ # track_targets (list of str): Tracking targets
+ # selection (PipelineSelection): The selection mode for the specified targets
+ # track_selection (PipelineSelection): The selection mode for the specified tracking targets
+ # except_targets (list of str): Specified targets to except from fetching
+ # track_except_targets (list of str): Specified targets to except from fetching
+ # track_cross_junctions (bool): Whether tracking should cross junction boundaries
+ # use_artifact_config (bool): Whether to initialize artifacts with the config
+ # artifact_remote_url (bool): A remote url for initializing the artifacts
+ # fetch_subprojects (bool): Whether to fetch subprojects while loading
+ #
+ # Returns:
+ # (list of Element): The primary element selection
+ # (list of Element): The tracking element selection
+ #
+ def _load(self, targets, track_targets, *,
+ selection=PipelineSelection.NONE,
+ track_selection=PipelineSelection.NONE,
+ except_targets=(),
+ track_except_targets=(),
+ track_cross_junctions=False,
+ use_artifact_config=False,
+ artifact_remote_url=None,
+ fetch_subprojects=False):
+
+ # Load rewritable if we have any tracking selection to make
+ rewritable = False
+ if track_targets:
+ rewritable = True
+
+ # Load all targets
+ elements, except_elements, track_elements, track_except_elements = \
+ self._pipeline.load([targets, except_targets, track_targets, track_except_targets],
+ rewritable=rewritable,
+ fetch_subprojects=fetch_subprojects)
+
+ # Hold on to the targets
+ self.targets = elements
+
+ # Here we should raise an error if the track_elements targets
+ # are not dependencies of the primary targets, this is not
+ # supported.
+ #
+ # This can happen with `bst build --track`
+ #
+ if not self._pipeline.targets_include(elements, track_elements):
+ raise StreamError("Specified tracking targets that are not "
+ "within the scope of primary targets")
+
+ # First take care of marking tracking elements, this must be
+ # done before resolving element states.
+ #
+ assert track_selection != PipelineSelection.PLAN
+ track_selected = self._pipeline.get_selection(track_elements, track_selection)
+ track_selected = self._pipeline.except_elements(track_elements,
+ track_selected,
+ track_except_elements)
+ track_selected = self._pipeline.track_cross_junction_filter(track_selected,
+ track_cross_junctions)
+
+ for element in track_selected:
+ element._schedule_tracking()
+
+ # Connect to remote caches, this needs to be done before resolving element state
+ self._artifacts.setup_remotes(use_config=use_artifact_config, remote_url=artifact_remote_url)
+
+ # Now move on to loading primary selection.
+ #
+ self._pipeline.resolve_elements(elements)
+ selected = self._pipeline.get_selection(elements, selection)
+ selected = self._pipeline.except_elements(elements,
+ selected,
+ except_elements)
+
+ return selected, track_selected
+
# _message()
#
# Local message propagator
@@ -676,47 +813,103 @@ class Stream():
self._context.message(
Message(None, message_type, message, **args))
- # _do_fetch()
+ # _add_queue()
+ #
+ # Adds a queue to the stream
+ #
+ # Args:
+ # queue (Queue): Queue to add to the pipeline
+ # track (bool): Whether this is the tracking queue
+ #
+ def _add_queue(self, queue, *, track=False):
+ self.queues.append(queue)
+
+ if not (track or self._first_non_track_queue):
+ self._first_non_track_queue = queue
+
+ # _enqueue_plan()
+ #
+ # Enqueues planned elements to the specified queue.
+ #
+ # Args:
+ # plan (list of Element): The list of elements to be enqueued
+ # queue (Queue): The target queue, defaults to the first non-track queue
+ #
+ def _enqueue_plan(self, plan, *, queue=None):
+ queue = queue or self._first_non_track_queue
+
+ queue.enqueue(plan)
+ self.session_elements += plan
+
+ # _run()
+ #
+ # Common function for running the scheduler
+ #
+ def _run(self):
+
+ # Inform the frontend of the full list of elements
+ # and the list of elements which will be processed in this run
+ #
+ self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
+
+ if self._session_start_callback is not None:
+ self._session_start_callback()
+
+ _, status = self._scheduler.run(self.queues)
+
+ # Force update element states after a run, such that the summary
+ # is more coherent
+ try:
+ for element in self.total_elements:
+ element._update_state()
+ except BstError as e:
+ self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ set_last_task_error(e.domain, e.reason)
+ except Exception as e: # pylint: disable=broad-except
+ self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+
+ if status == SchedStatus.ERROR:
+ raise StreamError()
+ elif status == SchedStatus.TERMINATED:
+ raise StreamError(terminated=True)
+
+ # _fetch()
#
# Performs the fetch job, the body of this function is here because
# it is shared between a few internals.
#
# Args:
# elements (list of Element): Elements to fetch
+ # track_elements (list of Element): Elements to track
#
- def _do_fetch(self, elements):
+ def _fetch(self, elements, *, track_elements=None):
- fetch_plan = elements
+ if track_elements is None:
+ track_elements = []
# Subtract the track elements from the fetch elements, they will be added separately
- if self._pipeline._track_elements:
- track_elements = set(self._pipeline._track_elements)
- fetch_plan = [e for e in fetch_plan if e not in track_elements]
+ fetch_plan = self._pipeline.subtract_elements(elements, track_elements)
# Assert consistency for the fetch elements
- self._pipeline._assert_consistent(fetch_plan)
+ self._pipeline.assert_consistent(fetch_plan)
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
cached = [elt for elt in fetch_plan if elt._get_consistency() == Consistency.CACHED]
- fetch_plan = [elt for elt in fetch_plan if elt not in cached]
-
- self.session_elements = len(self._pipeline._track_elements) + len(fetch_plan)
+ fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
- fetch = FetchQueue(self._scheduler)
- fetch.enqueue(fetch_plan)
- if self._pipeline._track_elements:
- track = TrackQueue(self._scheduler)
- track.enqueue(self._pipeline._track_elements)
- queues = [track, fetch]
- else:
- queues = [fetch]
+ # Construct queues, enqueue and run
+ #
+ track_queue = None
+ if track_elements:
+ track_queue = TrackQueue(self._scheduler)
+ self._add_queue(track_queue, track=True)
+ self._add_queue(FetchQueue(self._scheduler))
- _, status = self._scheduler.run(queues)
- if status == SchedStatus.ERROR:
- raise StreamError()
- elif status == SchedStatus.TERMINATED:
- raise StreamError(terminated=True)
+ if track_elements:
+ self._enqueue_plan(track_elements, queue=track_queue)
+ self._enqueue_plan(fetch_plan)
+ self._run()
# Helper function for checkout()
#
@@ -772,7 +965,7 @@ class Stream():
# Collect the sources in the given sandbox into a tarfile
def _collect_sources(self, directory, tar_name, element_name, compression):
- with self._pipeline.targets[0].timed_activity("Creating tarball {}".format(tar_name)):
+ with self._context.timed_activity("Creating tarball {}".format(tar_name)):
if compression == "none":
permissions = "w:"
else:
@@ -780,64 +973,3 @@ class Stream():
with tarfile.open(tar_name, permissions) as tar:
tar.add(directory, arcname=element_name)
-
- #############################################################
- # TEMPORARY CRAP #
- #############################################################
-
- # init_pipeline()
- #
- # Initialize the pipeline for a given activity
- #
- # Args:
- # elements (list of elements): The elements to load recursively
- # except_ (list of elements): The elements to except
- # rewritable (bool): Whether we should load the YAML files for roundtripping
- # use_configured_remote_caches (bool): Whether we should contact remotes
- # add_remote_cache (str): The URL for an explicitly mentioned remote cache
- # track_elements (list of elements): Elements which are to be tracked
- # track_cross_junctions (bool): Whether tracking is allowed to cross junction boundaries
- # track_selection (PipelineSelection): The selection algorithm for track elements
- # fetch_subprojects (bool): Whether to fetch subprojects while loading
- #
- # Note that the except_ argument may have a subtly different meaning depending
- # on the activity performed on the Pipeline. In normal circumstances the except_
- # argument excludes elements from the `elements` list. In a build session, the
- # except_ elements are excluded from the tracking plan.
- #
- def init_pipeline(self, elements, *,
- except_=tuple(),
- rewritable=False,
- use_configured_remote_caches=False,
- add_remote_cache=None,
- track_elements=None,
- track_cross_junctions=False,
- track_selection=PipelineSelection.ALL,
- fetch_subprojects=True):
-
- profile_start(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- self._pipeline = Pipeline(self._context, self._project, self._artifacts,
- elements, except_,
- rewritable=rewritable,
- fetch_subprojects=fetch_subprojects)
-
- # After loading the projects, but before resolving cache keys,
- # we need to initialize remote artifact caches where relevant
- #
- self._artifacts.setup_remotes(use_config=use_configured_remote_caches,
- remote_url=add_remote_cache)
-
- # Now complete the initialization
- #
- self._pipeline.initialize(track_elements=track_elements,
- track_cross_junctions=track_cross_junctions,
- track_selection=track_selection)
-
- profile_end(Topics.LOAD_PIPELINE, "_".join(t.replace(os.sep, '-') for t in elements))
-
- # Get the total
- self.total_elements = len(list(self._pipeline.dependencies(Scope.ALL)))
-
- if self._loaded_cb is not None:
- self._loaded_cb(self._pipeline)