summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py60
1 files changed, 38 insertions, 22 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 8097f451d..a7db33bb9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -49,7 +49,6 @@ from . import Scope, Consistency
#
# Args:
# context (Context): The Context object
-# project (Project): The Project object
# session_start (datetime): The time when the session started
# session_start_callback (callable): A callback to invoke when the session starts
# interrupt_callback (callable): A callback to invoke when we get interrupted
@@ -59,7 +58,7 @@ from . import Scope, Consistency
#
class Stream():
- def __init__(self, context, project, session_start, *,
+ def __init__(self, context, session_start, *,
session_start_callback=None,
interrupt_callback=None,
ticker_callback=None,
@@ -80,8 +79,8 @@ class Stream():
self._artifacts = context.artifactcache
self._sourcecache = context.sourcecache
self._context = context
- self._project = project
- self._pipeline = Pipeline(context, project, self._artifacts)
+ self._project = None
+ self._pipeline = None
self._scheduler = Scheduler(context, session_start,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback,
@@ -98,6 +97,18 @@ class Stream():
if self._project:
self._project.cleanup()
+ # set_project()
+ #
+ # Set the top-level project.
+ #
+ # Args:
+ # project (Project): The Project object
+ #
+ def set_project(self, project):
+ assert self._project is None
+ self._project = project
+ self._pipeline = Pipeline(self._context, project, self._artifacts)
+
# load_selection()
#
# An all purpose method for loading a selection of elements, this
@@ -121,7 +132,6 @@ class Stream():
target_objects, _ = self._load(targets, (),
selection=selection,
except_targets=except_targets,
- fetch_subprojects=False,
use_artifact_config=use_artifact_config,
load_refs=load_refs)
@@ -242,7 +252,6 @@ class Stream():
use_artifact_config=use_config,
artifact_remote_url=remote,
use_source_config=True,
- fetch_subprojects=True,
dynamic_plan=True)
# Remove the tracking elements from the main targets
@@ -323,7 +332,6 @@ class Stream():
except_targets=except_targets,
track_except_targets=track_except_targets,
track_cross_junctions=track_cross_junctions,
- fetch_subprojects=True,
use_source_config=use_source_config,
source_remote_url=remote)
@@ -356,8 +364,7 @@ class Stream():
selection=selection, track_selection=selection,
except_targets=except_targets,
track_except_targets=except_targets,
- track_cross_junctions=cross_junctions,
- fetch_subprojects=True)
+ track_cross_junctions=cross_junctions)
track_queue = TrackQueue(self._scheduler)
self._add_queue(track_queue, track=True)
@@ -390,8 +397,7 @@ class Stream():
selection=selection,
ignore_junction_targets=ignore_junction_targets,
use_artifact_config=use_config,
- artifact_remote_url=remote,
- fetch_subprojects=True)
+ artifact_remote_url=remote)
if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
@@ -431,8 +437,7 @@ class Stream():
selection=selection,
ignore_junction_targets=ignore_junction_targets,
use_artifact_config=use_config,
- artifact_remote_url=remote,
- fetch_subprojects=True)
+ artifact_remote_url=remote)
if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -496,9 +501,7 @@ class Stream():
# if pulling we need to ensure dependency artifacts are also pulled
selection = PipelineSelection.RUN if pull else PipelineSelection.NONE
- elements, _ = self._load(
- (target,), (), selection=selection,
- fetch_subprojects=True, use_artifact_config=True)
+ elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True)
target = elements[-1]
@@ -644,8 +647,7 @@ class Stream():
elements, _ = self._load((target,), (),
selection=deps,
- except_targets=except_targets,
- fetch_subprojects=True)
+ except_targets=except_targets)
# Assert all sources are cached in the source dir
if fetch:
@@ -951,6 +953,23 @@ class Stream():
return list(output_elements)
+ # fetch_subprojects()
+ #
+ # Fetch subprojects as part of the project and element loading process.
+ #
+ # Args:
+ # junctions (list of Element): The junctions to fetch
+ #
+ def fetch_subprojects(self, junctions):
+ old_queues = self.queues
+ try:
+ queue = FetchQueue(self._scheduler)
+ queue.enqueue(junctions)
+ self.queues = [queue]
+ self._run()
+ finally:
+ self.queues = old_queues
+
#############################################################
# Scheduler API forwarding #
#############################################################
@@ -1039,7 +1058,6 @@ class Stream():
# use_source_config (bool): Whether to initialize remote source caches with the config
# artifact_remote_url (str): A remote url for initializing the artifacts
# source_remote_url (str): A remote url for initializing source caches
- # fetch_subprojects (bool): Whether to fetch subprojects while loading
#
# Returns:
# (list of Element): The primary element selection
@@ -1056,7 +1074,6 @@ class Stream():
use_source_config=False,
artifact_remote_url=None,
source_remote_url=None,
- fetch_subprojects=False,
dynamic_plan=False,
load_refs=False):
@@ -1075,8 +1092,7 @@ class Stream():
# Load all target elements
elements, except_elements, track_elements, track_except_elements = \
self._pipeline.load([target_elements, except_targets, track_targets, track_except_targets],
- rewritable=rewritable,
- fetch_subprojects=fetch_subprojects)
+ rewritable=rewritable)
# Obtain the ArtifactElement objects
artifacts = [self._project.create_artifact_element(ref) for ref in target_artifacts]