summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py538
1 files changed, 286 insertions, 252 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index f09a46185..2515fadce 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -33,8 +33,19 @@ from collections import deque
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
-from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
+from ._scheduler import (
+ Scheduler,
+ SchedStatus,
+ TrackQueue,
+ FetchQueue,
+ SourcePushQueue,
+ BuildQueue,
+ PullQueue,
+ ArtifactPushQueue,
+ NotificationType,
+ Notification,
+ JobStatus,
+)
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -55,20 +66,18 @@ from . import Scope, Consistency
# interrupt_callback (callable): A callback to invoke when we get interrupted
# ticker_callback (callable): Invoked every second while running the scheduler
#
-class Stream():
-
- def __init__(self, context, session_start, *,
- session_start_callback=None,
- interrupt_callback=None,
- ticker_callback=None):
+class Stream:
+ def __init__(
+ self, context, session_start, *, session_start_callback=None, interrupt_callback=None, ticker_callback=None
+ ):
#
# Public members
#
- self.targets = [] # Resolved target elements
- self.session_elements = [] # List of elements being processed this session
- self.total_elements = [] # Total list of elements based on targets
- self.queues = [] # Queue objects
+ self.targets = [] # Resolved target elements
+ self.session_elements = [] # List of elements being processed this session
+ self.total_elements = [] # Total list of elements based on targets
+ self.queues = [] # Queue objects
#
# Private members
@@ -84,8 +93,9 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
- self._scheduler_notification_handler)
+ self._scheduler = Scheduler(
+ context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
+ )
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
self._ticker_callback = ticker_callback
@@ -138,17 +148,24 @@ class Stream():
#
# Returns:
# (list of Element): The selected elements
- def load_selection(self, targets, *,
- selection=PipelineSelection.NONE,
- except_targets=(),
- use_artifact_config=False,
- load_refs=False):
+ def load_selection(
+ self,
+ targets,
+ *,
+ selection=PipelineSelection.NONE,
+ except_targets=(),
+ use_artifact_config=False,
+ load_refs=False
+ ):
with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)):
- target_objects, _ = self._load(targets, (),
- selection=selection,
- except_targets=except_targets,
- use_artifact_config=use_artifact_config,
- load_refs=load_refs)
+ target_objects, _ = self._load(
+ targets,
+ (),
+ selection=selection,
+ except_targets=except_targets,
+ use_artifact_config=use_artifact_config,
+ load_refs=load_refs,
+ )
return target_objects
@@ -171,14 +188,20 @@ class Stream():
# Returns:
# (int): The exit code of the launched shell
#
- def shell(self, element, scope, prompt, *,
- directory=None,
- mounts=None,
- isolate=False,
- command=None,
- usebuildtree=None,
- pull_dependencies=None,
- unique_id=None):
+ def shell(
+ self,
+ element,
+ scope,
+ prompt,
+ *,
+ directory=None,
+ mounts=None,
+ isolate=False,
+ command=None,
+ usebuildtree=None,
+ pull_dependencies=None,
+ unique_id=None
+ ):
# Load the Element via the unique_id if given
if unique_id and element is None:
@@ -192,18 +215,16 @@ class Stream():
if not element._source_cached():
raise StreamError(
"Sources for element {} are not cached."
- "Element must be fetched.".format(element._get_full_name()))
+ "Element must be fetched.".format(element._get_full_name())
+ )
- missing_deps = [
- dep for dep in self._pipeline.dependencies([element], scope)
- if not dep._cached()
- ]
+ missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()]
if missing_deps:
if not pull_dependencies:
raise StreamError(
"Elements need to be built or downloaded before staging a shell environment",
- detail="\n"
- .join(list(map(lambda x: x._get_full_name(), missing_deps))))
+ detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))),
+ )
self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts")
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -236,8 +257,9 @@ class Stream():
else:
buildtree = True
- return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
- usebuildtree=buildtree)
+ return element._shell(
+ scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree
+ )
# build()
#
@@ -252,23 +274,22 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- def build(self, targets, *,
- selection=PipelineSelection.PLAN,
- ignore_junction_targets=False,
- remote=None):
+ def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
use_config = False
- elements, _ = \
- self._load(targets, [],
- selection=selection,
- ignore_junction_targets=ignore_junction_targets,
- use_artifact_config=use_config,
- artifact_remote_url=remote,
- use_source_config=True,
- dynamic_plan=True)
+ elements, _ = self._load(
+ targets,
+ [],
+ selection=selection,
+ ignore_junction_targets=ignore_junction_targets,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ use_source_config=True,
+ dynamic_plan=True,
+ )
# Assert that the elements are consistent
self._pipeline.assert_consistent(elements)
@@ -317,12 +338,16 @@ class Stream():
# track_cross_junctions (bool): Whether tracking should cross junction boundaries
# remote (str|None): The URL of a specific remote server to pull from.
#
- def fetch(self, targets, *,
- selection=PipelineSelection.PLAN,
- except_targets=None,
- track_targets=False,
- track_cross_junctions=False,
- remote=None):
+ def fetch(
+ self,
+ targets,
+ *,
+ selection=PipelineSelection.PLAN,
+ except_targets=None,
+ track_targets=False,
+ track_cross_junctions=False,
+ remote=None
+ ):
if track_targets:
track_targets = targets
@@ -337,14 +362,17 @@ class Stream():
if remote:
use_source_config = False
- elements, track_elements = \
- self._load(targets, track_targets,
- selection=selection, track_selection=track_selection,
- except_targets=except_targets,
- track_except_targets=track_except_targets,
- track_cross_junctions=track_cross_junctions,
- use_source_config=use_source_config,
- source_remote_url=remote)
+ elements, track_elements = self._load(
+ targets,
+ track_targets,
+ selection=selection,
+ track_selection=track_selection,
+ except_targets=except_targets,
+ track_except_targets=track_except_targets,
+ track_cross_junctions=track_cross_junctions,
+ use_source_config=use_source_config,
+ source_remote_url=remote,
+ )
# Delegated to a shared fetch method
self._fetch(elements, track_elements=track_elements)
@@ -362,20 +390,20 @@ class Stream():
# If no error is encountered while tracking, then the project files
# are rewritten inline.
#
- def track(self, targets, *,
- selection=PipelineSelection.REDIRECT,
- except_targets=None,
- cross_junctions=False):
+ def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False):
# We pass no target to build. Only to track. Passing build targets
# would fully load project configuration which might not be
# possible before tracking is done.
- _, elements = \
- self._load([], targets,
- selection=selection, track_selection=selection,
- except_targets=except_targets,
- track_except_targets=except_targets,
- track_cross_junctions=cross_junctions)
+ _, elements = self._load(
+ [],
+ targets,
+ selection=selection,
+ track_selection=selection,
+ except_targets=except_targets,
+ track_except_targets=except_targets,
+ track_cross_junctions=cross_junctions,
+ )
# FIXME: this can be refactored after element._update_state is simplified/removed
elements = [element for element in elements if element._schedule_tracking()]
@@ -400,21 +428,21 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to pull artifacts from.
#
- def pull(self, targets, *,
- selection=PipelineSelection.NONE,
- ignore_junction_targets=False,
- remote=None):
+ def pull(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
use_config = False
- elements, _ = self._load(targets, (),
- selection=selection,
- ignore_junction_targets=ignore_junction_targets,
- use_artifact_config=use_config,
- artifact_remote_url=remote,
- load_refs=True)
+ elements, _ = self._load(
+ targets,
+ (),
+ selection=selection,
+ ignore_junction_targets=ignore_junction_targets,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ load_refs=True,
+ )
if not self._artifacts.has_fetch_remotes():
raise StreamError("No artifact caches available for pulling artifacts")
@@ -442,21 +470,21 @@ class Stream():
# a pull queue will be created if user context and available remotes allow for
# attempting to fetch them.
#
- def push(self, targets, *,
- selection=PipelineSelection.NONE,
- ignore_junction_targets=False,
- remote=None):
+ def push(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
use_config = False
- elements, _ = self._load(targets, (),
- selection=selection,
- ignore_junction_targets=ignore_junction_targets,
- use_artifact_config=use_config,
- artifact_remote_url=remote,
- load_refs=True)
+ elements, _ = self._load(
+ targets,
+ (),
+ selection=selection,
+ ignore_junction_targets=ignore_junction_targets,
+ use_artifact_config=use_config,
+ artifact_remote_url=remote,
+ load_refs=True,
+ )
if not self._artifacts.has_push_remotes():
raise StreamError("No artifact caches available for pushing artifacts")
@@ -500,8 +528,10 @@ class Stream():
# ready an uncached element in the PushQueue.
if self._context.sched_error_action == _SchedulerErrorAction.CONTINUE and uncached_elements:
names = [element.name for element in uncached_elements]
- fail_str = "Error while pushing. The following elements were not pushed as they are " \
+ fail_str = (
+ "Error while pushing. The following elements were not pushed as they are "
"not yet cached:\n\n\t{}\n".format("\n\t".join(names))
+ )
raise StreamError(fail_str)
@@ -525,15 +555,19 @@ class Stream():
# pull (bool): If true will attempt to pull any missing or incomplete
# artifacts.
#
- def checkout(self, target, *,
- location=None,
- force=False,
- selection=PipelineSelection.RUN,
- integrate=True,
- hardlinks=False,
- compression='',
- pull=False,
- tar=False):
+ def checkout(
+ self,
+ target,
+ *,
+ location=None,
+ force=False,
+ selection=PipelineSelection.RUN,
+ integrate=True,
+ hardlinks=False,
+ compression="",
+ pull=False,
+ tar=False
+ ):
elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True)
@@ -554,15 +588,15 @@ class Stream():
self._run()
try:
- scope = {'run': Scope.RUN, 'build': Scope.BUILD, 'none': Scope.NONE, 'all': Scope.ALL}
- with target._prepare_sandbox(scope=scope[selection], directory=None,
- integrate=integrate) as sandbox:
+ scope = {"run": Scope.RUN, "build": Scope.BUILD, "none": Scope.NONE, "all": Scope.ALL}
+ with target._prepare_sandbox(scope=scope[selection], directory=None, integrate=integrate) as sandbox:
# Copy or move the sandbox to the target directory
virdir = sandbox.get_virtual_directory()
self._export_artifact(tar, location, compression, target, hardlinks, virdir)
except BstError as e:
- raise StreamError("Error while staging dependencies into a sandbox"
- ": '{}'".format(e), detail=e.detail, reason=e.reason) from e
+ raise StreamError(
+ "Error while staging dependencies into a sandbox" ": '{}'".format(e), detail=e.detail, reason=e.reason
+ ) from e
# _export_artifact()
#
@@ -578,34 +612,32 @@ class Stream():
#
def _export_artifact(self, tar, location, compression, target, hardlinks, virdir):
if not tar:
- with target.timed_activity("Checking out files in '{}'"
- .format(location)):
+ with target.timed_activity("Checking out files in '{}'".format(location)):
try:
if hardlinks:
self._checkout_hardlinks(virdir, location)
else:
virdir.export_files(location)
except OSError as e:
- raise StreamError("Failed to checkout files: '{}'"
- .format(e)) from e
+ raise StreamError("Failed to checkout files: '{}'".format(e)) from e
else:
- to_stdout = location == '-'
+ to_stdout = location == "-"
mode = _handle_compression(compression, to_stream=to_stdout)
with target.timed_activity("Creating tarball"):
if to_stdout:
# Save the stdout FD to restore later
saved_fd = os.dup(sys.stdout.fileno())
try:
- with os.fdopen(sys.stdout.fileno(), 'wb') as fo:
+ with os.fdopen(sys.stdout.fileno(), "wb") as fo:
with tarfile.open(fileobj=fo, mode=mode) as tf:
- virdir.export_to_tar(tf, '.')
+ virdir.export_to_tar(tf, ".")
finally:
# No matter what, restore stdout for further use
os.dup2(saved_fd, sys.stdout.fileno())
os.close(saved_fd)
else:
with tarfile.open(location, mode=mode) as tf:
- virdir.export_to_tar(tf, '.')
+ virdir.export_to_tar(tf, ".")
# artifact_show()
#
@@ -614,13 +646,9 @@ class Stream():
# Args:
# targets (str): Targets to show the cached state of
#
- def artifact_show(self, targets, *,
- selection=PipelineSelection.NONE):
+ def artifact_show(self, targets, *, selection=PipelineSelection.NONE):
# Obtain list of Element and/or ArtifactElement objects
- target_objects = self.load_selection(targets,
- selection=selection,
- use_artifact_config=True,
- load_refs=True)
+ target_objects = self.load_selection(targets, selection=selection, use_artifact_config=True, load_refs=True)
if self._artifacts.has_fetch_remotes():
self._pipeline.check_remotes(target_objects)
@@ -695,8 +723,7 @@ class Stream():
# Args:
# targets (str): Targets to remove
#
- def artifact_delete(self, targets, *,
- selection=PipelineSelection.NONE):
+ def artifact_delete(self, targets, *, selection=PipelineSelection.NONE):
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=selection, load_refs=True)
@@ -736,20 +763,22 @@ class Stream():
# compression (str): The type of compression for tarball
# include_build_scripts (bool): Whether to include build scripts in the checkout
#
- def source_checkout(self, target, *,
- location=None,
- force=False,
- deps='none',
- except_targets=(),
- tar=False,
- compression=None,
- include_build_scripts=False):
+ def source_checkout(
+ self,
+ target,
+ *,
+ location=None,
+ force=False,
+ deps="none",
+ except_targets=(),
+ tar=False,
+ compression=None,
+ include_build_scripts=False
+ ):
self._check_location_writable(location, force=force, tar=tar)
- elements, _ = self._load((target,), (),
- selection=deps,
- except_targets=except_targets)
+ elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets)
# Assert all sources are cached in the source dir
self._fetch(elements)
@@ -757,11 +786,11 @@ class Stream():
# Stage all sources determined by scope
try:
- self._source_checkout(elements, location, force, deps,
- tar, compression, include_build_scripts)
+ self._source_checkout(elements, location, force, deps, tar, compression, include_build_scripts)
except BstError as e:
- raise StreamError("Error while writing sources"
- ": '{}'".format(e), detail=e.detail, reason=e.reason) from e
+ raise StreamError(
+ "Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason
+ ) from e
self._message(MessageType.INFO, "Checked out sources to '{}'".format(location))
@@ -776,11 +805,7 @@ class Stream():
# force (bool): Whether to ignore contents in an existing directory
# custom_dir (str): Custom location to create a workspace or false to use default location.
#
- def workspace_open(self, targets, *,
- no_checkout,
- track_first,
- force,
- custom_dir):
+ def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir):
# This function is a little funny but it is trying to be as atomic as possible.
if track_first:
@@ -788,9 +813,9 @@ class Stream():
else:
track_targets = ()
- elements, track_elements = self._load(targets, track_targets,
- selection=PipelineSelection.REDIRECT,
- track_selection=PipelineSelection.REDIRECT)
+ elements, track_elements = self._load(
+ targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
+ )
workspaces = self._context.get_workspaces()
@@ -819,33 +844,44 @@ class Stream():
workspace = workspaces.get_workspace(target._get_full_name())
if workspace:
if not force:
- raise StreamError("Element '{}' already has an open workspace defined at: {}"
- .format(target.name, workspace.get_absolute_path()))
+ raise StreamError(
+ "Element '{}' already has an open workspace defined at: {}".format(
+ target.name, workspace.get_absolute_path()
+ )
+ )
if not no_checkout:
- target.warn("Replacing existing workspace for element '{}' defined at: {}"
- .format(target.name, workspace.get_absolute_path()))
+ target.warn(
+ "Replacing existing workspace for element '{}' defined at: {}".format(
+ target.name, workspace.get_absolute_path()
+ )
+ )
self.workspace_close(target._get_full_name(), remove_dir=not no_checkout)
target_consistency = target._get_consistency()
- if not no_checkout and target_consistency < Consistency.CACHED and \
- target_consistency._source_cached():
- raise StreamError("Could not stage uncached source. For {} ".format(target.name) +
- "Use `--track` to track and " +
- "fetch the latest version of the " +
- "source.")
+ if not no_checkout and target_consistency < Consistency.CACHED and target_consistency._source_cached():
+ raise StreamError(
+ "Could not stage uncached source. For {} ".format(target.name)
+ + "Use `--track` to track and "
+ + "fetch the latest version of the "
+ + "source."
+ )
if not custom_dir:
directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name))
- if directory[-4:] == '.bst':
+ if directory[-4:] == ".bst":
directory = directory[:-4]
expanded_directories.append(directory)
if custom_dir:
if len(elements) != 1:
- raise StreamError("Exactly one element can be given if --directory is used",
- reason='directory-with-multiple-elements')
+ raise StreamError(
+ "Exactly one element can be given if --directory is used",
+ reason="directory-with-multiple-elements",
+ )
directory = os.path.abspath(custom_dir)
- expanded_directories = [directory, ]
+ expanded_directories = [
+ directory,
+ ]
else:
# If this fails it is a bug in what ever calls this, usually cli.py and so can not be tested for via the
# run bst test mechanism.
@@ -854,12 +890,16 @@ class Stream():
for target, directory in zip(elements, expanded_directories):
if os.path.exists(directory):
if not os.path.isdir(directory):
- raise StreamError("For element '{}', Directory path is not a directory: {}"
- .format(target.name, directory), reason='bad-directory')
+ raise StreamError(
+ "For element '{}', Directory path is not a directory: {}".format(target.name, directory),
+ reason="bad-directory",
+ )
if not (no_checkout or force) and os.listdir(directory):
- raise StreamError("For element '{}', Directory path is not empty: {}"
- .format(target.name, directory), reason='bad-directory')
+ raise StreamError(
+ "For element '{}', Directory path is not empty: {}".format(target.name, directory),
+ reason="bad-directory",
+ )
if os.listdir(directory):
if force and not no_checkout:
utils._force_rmtree(directory)
@@ -868,8 +908,7 @@ class Stream():
# Now it does the bits that can not be made atomic.
targetGenerator = zip(elements, expanded_directories)
for target, directory in targetGenerator:
- self._message(MessageType.INFO, "Creating workspace for element {}"
- .format(target.name))
+ self._message(MessageType.INFO, "Creating workspace for element {}".format(target.name))
workspace = workspaces.get_workspace(target._get_full_name())
if workspace and not no_checkout:
@@ -886,8 +925,7 @@ class Stream():
raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e
workspaces.create_workspace(target, directory, checkout=not no_checkout)
- self._message(MessageType.INFO, "Created a workspace for element: {}"
- .format(target._get_full_name()))
+ self._message(MessageType.INFO, "Created a workspace for element: {}".format(target._get_full_name()))
# workspace_close
#
@@ -903,13 +941,13 @@ class Stream():
# Remove workspace directory if prompted
if remove_dir:
- with self._context.messenger.timed_activity("Removing workspace directory {}"
- .format(workspace.get_absolute_path())):
+ with self._context.messenger.timed_activity(
+ "Removing workspace directory {}".format(workspace.get_absolute_path())
+ ):
try:
shutil.rmtree(workspace.get_absolute_path())
except OSError as e:
- raise StreamError("Could not remove '{}': {}"
- .format(workspace.get_absolute_path(), e)) from e
+ raise StreamError("Could not remove '{}': {}".format(workspace.get_absolute_path(), e)) from e
# Delete the workspace and save the configuration
workspaces.delete_workspace(element_name)
@@ -928,9 +966,9 @@ class Stream():
#
def workspace_reset(self, targets, *, soft, track_first):
- elements, _ = self._load(targets, [],
- selection=PipelineSelection.REDIRECT,
- track_selection=PipelineSelection.REDIRECT)
+ elements, _ = self._load(
+ targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
+ )
nonexisting = []
for element in elements:
@@ -946,14 +984,20 @@ class Stream():
if soft:
workspace.prepared = False
- self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
- .format(element.name, workspace_path))
+ self._message(
+ MessageType.INFO, "Reset workspace state for {} at: {}".format(element.name, workspace_path)
+ )
continue
self.workspace_close(element._get_full_name(), remove_dir=True)
workspaces.save_config()
- self.workspace_open([element._get_full_name()],
- no_checkout=False, track_first=track_first, force=True, custom_dir=workspace_path)
+ self.workspace_open(
+ [element._get_full_name()],
+ no_checkout=False,
+ track_first=track_first,
+ force=True,
+ custom_dir=workspace_path,
+ )
# workspace_exists
#
@@ -1001,14 +1045,12 @@ class Stream():
workspaces = []
for element_name, workspace_ in self._context.get_workspaces().list():
workspace_detail = {
- 'element': element_name,
- 'directory': workspace_.get_absolute_path(),
+ "element": element_name,
+ "directory": workspace_.get_absolute_path(),
}
workspaces.append(workspace_detail)
- _yaml.roundtrip_dump({
- 'workspaces': workspaces
- })
+ _yaml.roundtrip_dump({"workspaces": workspaces})
# redirect_element_names()
#
@@ -1034,9 +1076,9 @@ class Stream():
else:
output_elements.add(e)
if load_elements:
- loaded_elements, _ = self._load(load_elements, (),
- selection=PipelineSelection.REDIRECT,
- track_selection=PipelineSelection.REDIRECT)
+ loaded_elements, _ = self._load(
+ load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT
+ )
for e in loaded_elements:
output_elements.add(e.name)
@@ -1166,26 +1208,31 @@ class Stream():
# (list of Element): The primary element selection
# (list of Element): The tracking element selection
#
- def _load(self, targets, track_targets, *,
- selection=PipelineSelection.NONE,
- track_selection=PipelineSelection.NONE,
- except_targets=(),
- track_except_targets=(),
- track_cross_junctions=False,
- ignore_junction_targets=False,
- use_artifact_config=False,
- use_source_config=False,
- artifact_remote_url=None,
- source_remote_url=None,
- dynamic_plan=False,
- load_refs=False):
+ def _load(
+ self,
+ targets,
+ track_targets,
+ *,
+ selection=PipelineSelection.NONE,
+ track_selection=PipelineSelection.NONE,
+ except_targets=(),
+ track_except_targets=(),
+ track_cross_junctions=False,
+ ignore_junction_targets=False,
+ use_artifact_config=False,
+ use_source_config=False,
+ artifact_remote_url=None,
+ source_remote_url=None,
+ dynamic_plan=False,
+ load_refs=False
+ ):
# Classify element and artifact strings
target_elements, target_artifacts = self._classify_artifacts(targets)
if target_artifacts:
if not load_refs:
- detail = '\n'.join(target_artifacts)
+ detail = "\n".join(target_artifacts)
raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail)
if selection in (PipelineSelection.ALL, PipelineSelection.RUN):
raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection))
@@ -1198,8 +1245,9 @@ class Stream():
# Load all target elements
loadable = [target_elements, except_targets, track_targets, track_except_targets]
if any(loadable):
- elements, except_elements, track_elements, track_except_elements = \
- self._pipeline.load(loadable, rewritable=rewritable)
+ elements, except_elements, track_elements, track_except_elements = self._pipeline.load(
+ loadable, rewritable=rewritable
+ )
else:
elements, except_elements, track_elements, track_except_elements = [], [], [], []
@@ -1208,7 +1256,7 @@ class Stream():
# Optionally filter out junction elements
if ignore_junction_targets:
- elements = [e for e in elements if e.get_kind() != 'junction']
+ elements = [e for e in elements if e.get_kind() != "junction"]
# Hold on to the targets
self.targets = elements + artifacts
@@ -1233,14 +1281,10 @@ class Stream():
for project, project_elements in track_projects.items():
selected = self._pipeline.get_selection(project_elements, track_selection)
- selected = self._pipeline.track_cross_junction_filter(project,
- selected,
- track_cross_junctions)
+ selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions)
track_selected.extend(selected)
- track_selected = self._pipeline.except_elements(track_elements,
- track_selected,
- track_except_elements)
+ track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements)
if not targets:
return [], track_selected
@@ -1257,9 +1301,7 @@ class Stream():
#
self._pipeline.resolve_elements(self.targets)
selected = self._pipeline.get_selection(self.targets, selection, silent=False)
- selected = self._pipeline.except_elements(self.targets,
- selected,
- except_elements)
+ selected = self._pipeline.except_elements(self.targets, selected, except_elements)
if selection == PipelineSelection.PLAN and dynamic_plan:
# We use a dynamic build plan, only request artifacts of top-level targets,
@@ -1279,8 +1321,7 @@ class Stream():
#
def _message(self, message_type, message, **kwargs):
args = dict(kwargs)
- self._context.messenger.message(
- Message(message_type, message, **args))
+ self._context.messenger.message(Message(message_type, message, **args))
# _add_queue()
#
@@ -1321,9 +1362,7 @@ class Stream():
# unique_id (str): A unique_id to load an Element instance
#
def _failure_retry(self, action_name, unique_id):
- notification = Notification(NotificationType.RETRY,
- job_action=action_name,
- element=unique_id)
+ notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
self._notify(notification)
# _run()
@@ -1370,8 +1409,7 @@ class Stream():
# Filter out elements with cached sources, only from the fetch plan
# let the track plan resolve new refs.
- cached = [elt for elt in fetch_plan
- if not elt._should_fetch(fetch_original)]
+ cached = [elt for elt in fetch_plan if not elt._should_fetch(fetch_original)]
fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached)
# Construct queues, enqueue and run
@@ -1406,21 +1444,16 @@ class Stream():
try:
os.makedirs(location, exist_ok=True)
except OSError as e:
- raise StreamError("Failed to create destination directory: '{}'"
- .format(e)) from e
+ raise StreamError("Failed to create destination directory: '{}'".format(e)) from e
if not os.access(location, os.W_OK):
- raise StreamError("Destination directory '{}' not writable"
- .format(location))
+ raise StreamError("Destination directory '{}' not writable".format(location))
if not force and os.listdir(location):
- raise StreamError("Destination directory '{}' not empty"
- .format(location))
- elif os.path.exists(location) and location != '-':
+ raise StreamError("Destination directory '{}' not empty".format(location))
+ elif os.path.exists(location) and location != "-":
if not os.access(location, os.W_OK):
- raise StreamError("Output file '{}' not writable"
- .format(location))
+ raise StreamError("Output file '{}' not writable".format(location))
if not force and os.path.exists(location):
- raise StreamError("Output file '{}' already exists"
- .format(location))
+ raise StreamError("Output file '{}' already exists".format(location))
# Helper function for checkout()
#
@@ -1433,13 +1466,16 @@ class Stream():
sandbox_vroot.export_files(directory, can_link=True, can_destroy=True)
# Helper function for source_checkout()
- def _source_checkout(self, elements,
- location=None,
- force=False,
- deps='none',
- tar=False,
- compression=None,
- include_build_scripts=False):
+ def _source_checkout(
+ self,
+ elements,
+ location=None,
+ force=False,
+ deps="none",
+ tar=False,
+ compression=None,
+ include_build_scripts=False,
+ ):
location = os.path.abspath(location)
# Stage all our sources in a temporary directory. The this
@@ -1455,8 +1491,7 @@ class Stream():
else:
self._move_directory(temp_source_dir.name, location, force)
except OSError as e:
- raise StreamError("Failed to checkout sources to {}: {}"
- .format(location, e)) from e
+ raise StreamError("Failed to checkout sources to {}: {}".format(location, e)) from e
finally:
with suppress(FileNotFoundError):
temp_source_dir.cleanup()
@@ -1498,10 +1533,10 @@ class Stream():
# Create a tarball from the content of directory
def _create_tarball(self, directory, tar_name, compression):
if compression is None:
- compression = ''
+ compression = ""
mode = _handle_compression(compression)
try:
- with utils.save_file_atomic(tar_name, mode='wb') as f:
+ with utils.save_file_atomic(tar_name, mode="wb") as f:
tarball = tarfile.open(fileobj=f, mode=mode)
for item in os.listdir(str(directory)):
file_to_add = os.path.join(directory, item)
@@ -1598,7 +1633,7 @@ class Stream():
artifact_globs = []
for target in targets:
- if target.endswith('.bst'):
+ if target.endswith(".bst"):
if any(c in "*?[" for c in target):
element_globs.append(target)
else:
@@ -1628,7 +1663,7 @@ class Stream():
for glob in artifact_globs:
artifact_refs.extend(self._artifacts.list_artifacts(glob=glob))
if not artifact_refs:
- self._message(MessageType.WARN, "No artifacts found for globs: {}".format(', '.join(artifact_globs)))
+ self._message(MessageType.WARN, "No artifacts found for globs: {}".format(", ".join(artifact_globs)))
return element_targets, artifact_refs
@@ -1648,8 +1683,7 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
- self._state.fail_task(notification.job_action, notification.full_name,
- notification.element)
+ self._state.fail_task(notification.job_action, notification.full_name, notification.element)
elif notification.notification_type == NotificationType.SCHED_START_TIME:
self._starttime = notification.time
elif notification.notification_type == NotificationType.RUNNING:
@@ -1694,5 +1728,5 @@ class Stream():
# (str): The tarfile mode string
#
def _handle_compression(compression, *, to_stream=False):
- mode_prefix = 'w|' if to_stream else 'w:'
+ mode_prefix = "w|" if to_stream else "w:"
return mode_prefix + compression