diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 538 |
1 files changed, 286 insertions, 252 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index f09a46185..2515fadce 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -33,8 +33,19 @@ from collections import deque from ._artifactelement import verify_artifact_ref, ArtifactElement from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError from ._message import Message, MessageType -from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ - SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus +from ._scheduler import ( + Scheduler, + SchedStatus, + TrackQueue, + FetchQueue, + SourcePushQueue, + BuildQueue, + PullQueue, + ArtifactPushQueue, + NotificationType, + Notification, + JobStatus, +) from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER from ._state import State @@ -55,20 +66,18 @@ from . import Scope, Consistency # interrupt_callback (callable): A callback to invoke when we get interrupted # ticker_callback (callable): Invoked every second while running the scheduler # -class Stream(): - - def __init__(self, context, session_start, *, - session_start_callback=None, - interrupt_callback=None, - ticker_callback=None): +class Stream: + def __init__( + self, context, session_start, *, session_start_callback=None, interrupt_callback=None, ticker_callback=None + ): # # Public members # - self.targets = [] # Resolved target elements - self.session_elements = [] # List of elements being processed this session - self.total_elements = [] # Total list of elements based on targets - self.queues = [] # Queue objects + self.targets = [] # Resolved target elements + self.session_elements = [] # List of elements being processed this session + self.total_elements = [] # Total list of elements based on targets + self.queues = [] # Queue objects # # Private members @@ -84,8 +93,9 @@ class Stream(): context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, - self._scheduler_notification_handler) + self._scheduler = Scheduler( + context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler + ) self._first_non_track_queue = None self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback @@ -138,17 +148,24 @@ class Stream(): # # Returns: # (list of Element): The selected elements - def load_selection(self, targets, *, - selection=PipelineSelection.NONE, - except_targets=(), - use_artifact_config=False, - load_refs=False): + def load_selection( + self, + targets, + *, + selection=PipelineSelection.NONE, + except_targets=(), + use_artifact_config=False, + load_refs=False + ): with PROFILER.profile(Topics.LOAD_SELECTION, "_".join(t.replace(os.sep, "-") for t in targets)): - target_objects, _ = self._load(targets, (), - selection=selection, - except_targets=except_targets, - use_artifact_config=use_artifact_config, - load_refs=load_refs) + target_objects, _ = self._load( + targets, + (), + selection=selection, + except_targets=except_targets, + use_artifact_config=use_artifact_config, + load_refs=load_refs, + ) return target_objects @@ -171,14 +188,20 @@ class Stream(): # Returns: # (int): The exit code of the launched shell # - def shell(self, element, scope, prompt, *, - directory=None, - mounts=None, - isolate=False, - command=None, - usebuildtree=None, - pull_dependencies=None, - unique_id=None): + def shell( + self, + element, + scope, + prompt, + *, + directory=None, + mounts=None, + isolate=False, + command=None, + usebuildtree=None, + pull_dependencies=None, + unique_id=None + ): # Load the Element via the unique_id if given if unique_id and element is None: @@ -192,18 +215,16 @@ class Stream(): if not element._source_cached(): raise StreamError( "Sources for element {} are not cached." - "Element must be fetched.".format(element._get_full_name())) + "Element must be fetched.".format(element._get_full_name()) + ) - missing_deps = [ - dep for dep in self._pipeline.dependencies([element], scope) - if not dep._cached() - ] + missing_deps = [dep for dep in self._pipeline.dependencies([element], scope) if not dep._cached()] if missing_deps: if not pull_dependencies: raise StreamError( "Elements need to be built or downloaded before staging a shell environment", - detail="\n" - .join(list(map(lambda x: x._get_full_name(), missing_deps)))) + detail="\n".join(list(map(lambda x: x._get_full_name(), missing_deps))), + ) self._message(MessageType.INFO, "Attempting to fetch missing or incomplete artifacts") self._scheduler.clear_queues() self._add_queue(PullQueue(self._scheduler)) @@ -236,8 +257,9 @@ class Stream(): else: buildtree = True - return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, - usebuildtree=buildtree) + return element._shell( + scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree + ) # build() # @@ -252,23 +274,22 @@ class Stream(): # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # - def build(self, targets, *, - selection=PipelineSelection.PLAN, - ignore_junction_targets=False, - remote=None): + def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None): use_config = True if remote: use_config = False - elements, _ = \ - self._load(targets, [], - selection=selection, - ignore_junction_targets=ignore_junction_targets, - use_artifact_config=use_config, - artifact_remote_url=remote, - use_source_config=True, - dynamic_plan=True) + elements, _ = self._load( + targets, + [], + selection=selection, + ignore_junction_targets=ignore_junction_targets, + use_artifact_config=use_config, + artifact_remote_url=remote, + use_source_config=True, + dynamic_plan=True, + ) # Assert that the elements are consistent self._pipeline.assert_consistent(elements) @@ -317,12 +338,16 @@ class Stream(): # track_cross_junctions (bool): Whether tracking should cross junction boundaries # remote (str|None): The URL of a specific remote server to pull from. # - def fetch(self, targets, *, - selection=PipelineSelection.PLAN, - except_targets=None, - track_targets=False, - track_cross_junctions=False, - remote=None): + def fetch( + self, + targets, + *, + selection=PipelineSelection.PLAN, + except_targets=None, + track_targets=False, + track_cross_junctions=False, + remote=None + ): if track_targets: track_targets = targets @@ -337,14 +362,17 @@ class Stream(): if remote: use_source_config = False - elements, track_elements = \ - self._load(targets, track_targets, - selection=selection, track_selection=track_selection, - except_targets=except_targets, - track_except_targets=track_except_targets, - track_cross_junctions=track_cross_junctions, - use_source_config=use_source_config, - source_remote_url=remote) + elements, track_elements = self._load( + targets, + track_targets, + selection=selection, + track_selection=track_selection, + except_targets=except_targets, + track_except_targets=track_except_targets, + track_cross_junctions=track_cross_junctions, + use_source_config=use_source_config, + source_remote_url=remote, + ) # Delegated to a shared fetch method self._fetch(elements, track_elements=track_elements) @@ -362,20 +390,20 @@ class Stream(): # If no error is encountered while tracking, then the project files # are rewritten inline. # - def track(self, targets, *, - selection=PipelineSelection.REDIRECT, - except_targets=None, - cross_junctions=False): + def track(self, targets, *, selection=PipelineSelection.REDIRECT, except_targets=None, cross_junctions=False): # We pass no target to build. Only to track. Passing build targets # would fully load project configuration which might not be # possible before tracking is done. - _, elements = \ - self._load([], targets, - selection=selection, track_selection=selection, - except_targets=except_targets, - track_except_targets=except_targets, - track_cross_junctions=cross_junctions) + _, elements = self._load( + [], + targets, + selection=selection, + track_selection=selection, + except_targets=except_targets, + track_except_targets=except_targets, + track_cross_junctions=cross_junctions, + ) # FIXME: this can be refactored after element._update_state is simplified/removed elements = [element for element in elements if element._schedule_tracking()] @@ -400,21 +428,21 @@ class Stream(): # If `remote` specified as None, then regular configuration will be used # to determine where to pull artifacts from. # - def pull(self, targets, *, - selection=PipelineSelection.NONE, - ignore_junction_targets=False, - remote=None): + def pull(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: use_config = False - elements, _ = self._load(targets, (), - selection=selection, - ignore_junction_targets=ignore_junction_targets, - use_artifact_config=use_config, - artifact_remote_url=remote, - load_refs=True) + elements, _ = self._load( + targets, + (), + selection=selection, + ignore_junction_targets=ignore_junction_targets, + use_artifact_config=use_config, + artifact_remote_url=remote, + load_refs=True, + ) if not self._artifacts.has_fetch_remotes(): raise StreamError("No artifact caches available for pulling artifacts") @@ -442,21 +470,21 @@ class Stream(): # a pull queue will be created if user context and available remotes allow for # attempting to fetch them. # - def push(self, targets, *, - selection=PipelineSelection.NONE, - ignore_junction_targets=False, - remote=None): + def push(self, targets, *, selection=PipelineSelection.NONE, ignore_junction_targets=False, remote=None): use_config = True if remote: use_config = False - elements, _ = self._load(targets, (), - selection=selection, - ignore_junction_targets=ignore_junction_targets, - use_artifact_config=use_config, - artifact_remote_url=remote, - load_refs=True) + elements, _ = self._load( + targets, + (), + selection=selection, + ignore_junction_targets=ignore_junction_targets, + use_artifact_config=use_config, + artifact_remote_url=remote, + load_refs=True, + ) if not self._artifacts.has_push_remotes(): raise StreamError("No artifact caches available for pushing artifacts") @@ -500,8 +528,10 @@ class Stream(): # ready an uncached element in the PushQueue. if self._context.sched_error_action == _SchedulerErrorAction.CONTINUE and uncached_elements: names = [element.name for element in uncached_elements] - fail_str = "Error while pushing. The following elements were not pushed as they are " \ + fail_str = ( + "Error while pushing. The following elements were not pushed as they are " "not yet cached:\n\n\t{}\n".format("\n\t".join(names)) + ) raise StreamError(fail_str) @@ -525,15 +555,19 @@ class Stream(): # pull (bool): If true will attempt to pull any missing or incomplete # artifacts. # - def checkout(self, target, *, - location=None, - force=False, - selection=PipelineSelection.RUN, - integrate=True, - hardlinks=False, - compression='', - pull=False, - tar=False): + def checkout( + self, + target, + *, + location=None, + force=False, + selection=PipelineSelection.RUN, + integrate=True, + hardlinks=False, + compression="", + pull=False, + tar=False + ): elements, _ = self._load((target,), (), selection=selection, use_artifact_config=True, load_refs=True) @@ -554,15 +588,15 @@ class Stream(): self._run() try: - scope = {'run': Scope.RUN, 'build': Scope.BUILD, 'none': Scope.NONE, 'all': Scope.ALL} - with target._prepare_sandbox(scope=scope[selection], directory=None, - integrate=integrate) as sandbox: + scope = {"run": Scope.RUN, "build": Scope.BUILD, "none": Scope.NONE, "all": Scope.ALL} + with target._prepare_sandbox(scope=scope[selection], directory=None, integrate=integrate) as sandbox: # Copy or move the sandbox to the target directory virdir = sandbox.get_virtual_directory() self._export_artifact(tar, location, compression, target, hardlinks, virdir) except BstError as e: - raise StreamError("Error while staging dependencies into a sandbox" - ": '{}'".format(e), detail=e.detail, reason=e.reason) from e + raise StreamError( + "Error while staging dependencies into a sandbox" ": '{}'".format(e), detail=e.detail, reason=e.reason + ) from e # _export_artifact() # @@ -578,34 +612,32 @@ class Stream(): # def _export_artifact(self, tar, location, compression, target, hardlinks, virdir): if not tar: - with target.timed_activity("Checking out files in '{}'" - .format(location)): + with target.timed_activity("Checking out files in '{}'".format(location)): try: if hardlinks: self._checkout_hardlinks(virdir, location) else: virdir.export_files(location) except OSError as e: - raise StreamError("Failed to checkout files: '{}'" - .format(e)) from e + raise StreamError("Failed to checkout files: '{}'".format(e)) from e else: - to_stdout = location == '-' + to_stdout = location == "-" mode = _handle_compression(compression, to_stream=to_stdout) with target.timed_activity("Creating tarball"): if to_stdout: # Save the stdout FD to restore later saved_fd = os.dup(sys.stdout.fileno()) try: - with os.fdopen(sys.stdout.fileno(), 'wb') as fo: + with os.fdopen(sys.stdout.fileno(), "wb") as fo: with tarfile.open(fileobj=fo, mode=mode) as tf: - virdir.export_to_tar(tf, '.') + virdir.export_to_tar(tf, ".") finally: # No matter what, restore stdout for further use os.dup2(saved_fd, sys.stdout.fileno()) os.close(saved_fd) else: with tarfile.open(location, mode=mode) as tf: - virdir.export_to_tar(tf, '.') + virdir.export_to_tar(tf, ".") # artifact_show() # @@ -614,13 +646,9 @@ class Stream(): # Args: # targets (str): Targets to show the cached state of # - def artifact_show(self, targets, *, - selection=PipelineSelection.NONE): + def artifact_show(self, targets, *, selection=PipelineSelection.NONE): # Obtain list of Element and/or ArtifactElement objects - target_objects = self.load_selection(targets, - selection=selection, - use_artifact_config=True, - load_refs=True) + target_objects = self.load_selection(targets, selection=selection, use_artifact_config=True, load_refs=True) if self._artifacts.has_fetch_remotes(): self._pipeline.check_remotes(target_objects) @@ -695,8 +723,7 @@ class Stream(): # Args: # targets (str): Targets to remove # - def artifact_delete(self, targets, *, - selection=PipelineSelection.NONE): + def artifact_delete(self, targets, *, selection=PipelineSelection.NONE): # Return list of Element and/or ArtifactElement objects target_objects = self.load_selection(targets, selection=selection, load_refs=True) @@ -736,20 +763,22 @@ class Stream(): # compression (str): The type of compression for tarball # include_build_scripts (bool): Whether to include build scripts in the checkout # - def source_checkout(self, target, *, - location=None, - force=False, - deps='none', - except_targets=(), - tar=False, - compression=None, - include_build_scripts=False): + def source_checkout( + self, + target, + *, + location=None, + force=False, + deps="none", + except_targets=(), + tar=False, + compression=None, + include_build_scripts=False + ): self._check_location_writable(location, force=force, tar=tar) - elements, _ = self._load((target,), (), - selection=deps, - except_targets=except_targets) + elements, _ = self._load((target,), (), selection=deps, except_targets=except_targets) # Assert all sources are cached in the source dir self._fetch(elements) @@ -757,11 +786,11 @@ class Stream(): # Stage all sources determined by scope try: - self._source_checkout(elements, location, force, deps, - tar, compression, include_build_scripts) + self._source_checkout(elements, location, force, deps, tar, compression, include_build_scripts) except BstError as e: - raise StreamError("Error while writing sources" - ": '{}'".format(e), detail=e.detail, reason=e.reason) from e + raise StreamError( + "Error while writing sources" ": '{}'".format(e), detail=e.detail, reason=e.reason + ) from e self._message(MessageType.INFO, "Checked out sources to '{}'".format(location)) @@ -776,11 +805,7 @@ class Stream(): # force (bool): Whether to ignore contents in an existing directory # custom_dir (str): Custom location to create a workspace or false to use default location. # - def workspace_open(self, targets, *, - no_checkout, - track_first, - force, - custom_dir): + def workspace_open(self, targets, *, no_checkout, track_first, force, custom_dir): # This function is a little funny but it is trying to be as atomic as possible. if track_first: @@ -788,9 +813,9 @@ class Stream(): else: track_targets = () - elements, track_elements = self._load(targets, track_targets, - selection=PipelineSelection.REDIRECT, - track_selection=PipelineSelection.REDIRECT) + elements, track_elements = self._load( + targets, track_targets, selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT + ) workspaces = self._context.get_workspaces() @@ -819,33 +844,44 @@ class Stream(): workspace = workspaces.get_workspace(target._get_full_name()) if workspace: if not force: - raise StreamError("Element '{}' already has an open workspace defined at: {}" - .format(target.name, workspace.get_absolute_path())) + raise StreamError( + "Element '{}' already has an open workspace defined at: {}".format( + target.name, workspace.get_absolute_path() + ) + ) if not no_checkout: - target.warn("Replacing existing workspace for element '{}' defined at: {}" - .format(target.name, workspace.get_absolute_path())) + target.warn( + "Replacing existing workspace for element '{}' defined at: {}".format( + target.name, workspace.get_absolute_path() + ) + ) self.workspace_close(target._get_full_name(), remove_dir=not no_checkout) target_consistency = target._get_consistency() - if not no_checkout and target_consistency < Consistency.CACHED and \ - target_consistency._source_cached(): - raise StreamError("Could not stage uncached source. For {} ".format(target.name) + - "Use `--track` to track and " + - "fetch the latest version of the " + - "source.") + if not no_checkout and target_consistency < Consistency.CACHED and target_consistency._source_cached(): + raise StreamError( + "Could not stage uncached source. For {} ".format(target.name) + + "Use `--track` to track and " + + "fetch the latest version of the " + + "source." + ) if not custom_dir: directory = os.path.abspath(os.path.join(self._context.workspacedir, target.name)) - if directory[-4:] == '.bst': + if directory[-4:] == ".bst": directory = directory[:-4] expanded_directories.append(directory) if custom_dir: if len(elements) != 1: - raise StreamError("Exactly one element can be given if --directory is used", - reason='directory-with-multiple-elements') + raise StreamError( + "Exactly one element can be given if --directory is used", + reason="directory-with-multiple-elements", + ) directory = os.path.abspath(custom_dir) - expanded_directories = [directory, ] + expanded_directories = [ + directory, + ] else: # If this fails it is a bug in what ever calls this, usually cli.py and so can not be tested for via the # run bst test mechanism. @@ -854,12 +890,16 @@ class Stream(): for target, directory in zip(elements, expanded_directories): if os.path.exists(directory): if not os.path.isdir(directory): - raise StreamError("For element '{}', Directory path is not a directory: {}" - .format(target.name, directory), reason='bad-directory') + raise StreamError( + "For element '{}', Directory path is not a directory: {}".format(target.name, directory), + reason="bad-directory", + ) if not (no_checkout or force) and os.listdir(directory): - raise StreamError("For element '{}', Directory path is not empty: {}" - .format(target.name, directory), reason='bad-directory') + raise StreamError( + "For element '{}', Directory path is not empty: {}".format(target.name, directory), + reason="bad-directory", + ) if os.listdir(directory): if force and not no_checkout: utils._force_rmtree(directory) @@ -868,8 +908,7 @@ class Stream(): # Now it does the bits that can not be made atomic. targetGenerator = zip(elements, expanded_directories) for target, directory in targetGenerator: - self._message(MessageType.INFO, "Creating workspace for element {}" - .format(target.name)) + self._message(MessageType.INFO, "Creating workspace for element {}".format(target.name)) workspace = workspaces.get_workspace(target._get_full_name()) if workspace and not no_checkout: @@ -886,8 +925,7 @@ class Stream(): raise StreamError("Failed to create workspace directory: {}".format(e) + todo_elements) from e workspaces.create_workspace(target, directory, checkout=not no_checkout) - self._message(MessageType.INFO, "Created a workspace for element: {}" - .format(target._get_full_name())) + self._message(MessageType.INFO, "Created a workspace for element: {}".format(target._get_full_name())) # workspace_close # @@ -903,13 +941,13 @@ class Stream(): # Remove workspace directory if prompted if remove_dir: - with self._context.messenger.timed_activity("Removing workspace directory {}" - .format(workspace.get_absolute_path())): + with self._context.messenger.timed_activity( + "Removing workspace directory {}".format(workspace.get_absolute_path()) + ): try: shutil.rmtree(workspace.get_absolute_path()) except OSError as e: - raise StreamError("Could not remove '{}': {}" - .format(workspace.get_absolute_path(), e)) from e + raise StreamError("Could not remove '{}': {}".format(workspace.get_absolute_path(), e)) from e # Delete the workspace and save the configuration workspaces.delete_workspace(element_name) @@ -928,9 +966,9 @@ class Stream(): # def workspace_reset(self, targets, *, soft, track_first): - elements, _ = self._load(targets, [], - selection=PipelineSelection.REDIRECT, - track_selection=PipelineSelection.REDIRECT) + elements, _ = self._load( + targets, [], selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT + ) nonexisting = [] for element in elements: @@ -946,14 +984,20 @@ class Stream(): if soft: workspace.prepared = False - self._message(MessageType.INFO, "Reset workspace state for {} at: {}" - .format(element.name, workspace_path)) + self._message( + MessageType.INFO, "Reset workspace state for {} at: {}".format(element.name, workspace_path) + ) continue self.workspace_close(element._get_full_name(), remove_dir=True) workspaces.save_config() - self.workspace_open([element._get_full_name()], - no_checkout=False, track_first=track_first, force=True, custom_dir=workspace_path) + self.workspace_open( + [element._get_full_name()], + no_checkout=False, + track_first=track_first, + force=True, + custom_dir=workspace_path, + ) # workspace_exists # @@ -1001,14 +1045,12 @@ class Stream(): workspaces = [] for element_name, workspace_ in self._context.get_workspaces().list(): workspace_detail = { - 'element': element_name, - 'directory': workspace_.get_absolute_path(), + "element": element_name, + "directory": workspace_.get_absolute_path(), } workspaces.append(workspace_detail) - _yaml.roundtrip_dump({ - 'workspaces': workspaces - }) + _yaml.roundtrip_dump({"workspaces": workspaces}) # redirect_element_names() # @@ -1034,9 +1076,9 @@ class Stream(): else: output_elements.add(e) if load_elements: - loaded_elements, _ = self._load(load_elements, (), - selection=PipelineSelection.REDIRECT, - track_selection=PipelineSelection.REDIRECT) + loaded_elements, _ = self._load( + load_elements, (), selection=PipelineSelection.REDIRECT, track_selection=PipelineSelection.REDIRECT + ) for e in loaded_elements: output_elements.add(e.name) @@ -1166,26 +1208,31 @@ class Stream(): # (list of Element): The primary element selection # (list of Element): The tracking element selection # - def _load(self, targets, track_targets, *, - selection=PipelineSelection.NONE, - track_selection=PipelineSelection.NONE, - except_targets=(), - track_except_targets=(), - track_cross_junctions=False, - ignore_junction_targets=False, - use_artifact_config=False, - use_source_config=False, - artifact_remote_url=None, - source_remote_url=None, - dynamic_plan=False, - load_refs=False): + def _load( + self, + targets, + track_targets, + *, + selection=PipelineSelection.NONE, + track_selection=PipelineSelection.NONE, + except_targets=(), + track_except_targets=(), + track_cross_junctions=False, + ignore_junction_targets=False, + use_artifact_config=False, + use_source_config=False, + artifact_remote_url=None, + source_remote_url=None, + dynamic_plan=False, + load_refs=False + ): # Classify element and artifact strings target_elements, target_artifacts = self._classify_artifacts(targets) if target_artifacts: if not load_refs: - detail = '\n'.join(target_artifacts) + detail = "\n".join(target_artifacts) raise ArtifactElementError("Cannot perform this operation with artifact refs:", detail=detail) if selection in (PipelineSelection.ALL, PipelineSelection.RUN): raise StreamError("Error: '--deps {}' is not supported for artifact refs".format(selection)) @@ -1198,8 +1245,9 @@ class Stream(): # Load all target elements loadable = [target_elements, except_targets, track_targets, track_except_targets] if any(loadable): - elements, except_elements, track_elements, track_except_elements = \ - self._pipeline.load(loadable, rewritable=rewritable) + elements, except_elements, track_elements, track_except_elements = self._pipeline.load( + loadable, rewritable=rewritable + ) else: elements, except_elements, track_elements, track_except_elements = [], [], [], [] @@ -1208,7 +1256,7 @@ class Stream(): # Optionally filter out junction elements if ignore_junction_targets: - elements = [e for e in elements if e.get_kind() != 'junction'] + elements = [e for e in elements if e.get_kind() != "junction"] # Hold on to the targets self.targets = elements + artifacts @@ -1233,14 +1281,10 @@ class Stream(): for project, project_elements in track_projects.items(): selected = self._pipeline.get_selection(project_elements, track_selection) - selected = self._pipeline.track_cross_junction_filter(project, - selected, - track_cross_junctions) + selected = self._pipeline.track_cross_junction_filter(project, selected, track_cross_junctions) track_selected.extend(selected) - track_selected = self._pipeline.except_elements(track_elements, - track_selected, - track_except_elements) + track_selected = self._pipeline.except_elements(track_elements, track_selected, track_except_elements) if not targets: return [], track_selected @@ -1257,9 +1301,7 @@ class Stream(): # self._pipeline.resolve_elements(self.targets) selected = self._pipeline.get_selection(self.targets, selection, silent=False) - selected = self._pipeline.except_elements(self.targets, - selected, - except_elements) + selected = self._pipeline.except_elements(self.targets, selected, except_elements) if selection == PipelineSelection.PLAN and dynamic_plan: # We use a dynamic build plan, only request artifacts of top-level targets, @@ -1279,8 +1321,7 @@ class Stream(): # def _message(self, message_type, message, **kwargs): args = dict(kwargs) - self._context.messenger.message( - Message(message_type, message, **args)) + self._context.messenger.message(Message(message_type, message, **args)) # _add_queue() # @@ -1321,9 +1362,7 @@ class Stream(): # unique_id (str): A unique_id to load an Element instance # def _failure_retry(self, action_name, unique_id): - notification = Notification(NotificationType.RETRY, - job_action=action_name, - element=unique_id) + notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) self._notify(notification) # _run() @@ -1370,8 +1409,7 @@ class Stream(): # Filter out elements with cached sources, only from the fetch plan # let the track plan resolve new refs. - cached = [elt for elt in fetch_plan - if not elt._should_fetch(fetch_original)] + cached = [elt for elt in fetch_plan if not elt._should_fetch(fetch_original)] fetch_plan = self._pipeline.subtract_elements(fetch_plan, cached) # Construct queues, enqueue and run @@ -1406,21 +1444,16 @@ class Stream(): try: os.makedirs(location, exist_ok=True) except OSError as e: - raise StreamError("Failed to create destination directory: '{}'" - .format(e)) from e + raise StreamError("Failed to create destination directory: '{}'".format(e)) from e if not os.access(location, os.W_OK): - raise StreamError("Destination directory '{}' not writable" - .format(location)) + raise StreamError("Destination directory '{}' not writable".format(location)) if not force and os.listdir(location): - raise StreamError("Destination directory '{}' not empty" - .format(location)) - elif os.path.exists(location) and location != '-': + raise StreamError("Destination directory '{}' not empty".format(location)) + elif os.path.exists(location) and location != "-": if not os.access(location, os.W_OK): - raise StreamError("Output file '{}' not writable" - .format(location)) + raise StreamError("Output file '{}' not writable".format(location)) if not force and os.path.exists(location): - raise StreamError("Output file '{}' already exists" - .format(location)) + raise StreamError("Output file '{}' already exists".format(location)) # Helper function for checkout() # @@ -1433,13 +1466,16 @@ class Stream(): sandbox_vroot.export_files(directory, can_link=True, can_destroy=True) # Helper function for source_checkout() - def _source_checkout(self, elements, - location=None, - force=False, - deps='none', - tar=False, - compression=None, - include_build_scripts=False): + def _source_checkout( + self, + elements, + location=None, + force=False, + deps="none", + tar=False, + compression=None, + include_build_scripts=False, + ): location = os.path.abspath(location) # Stage all our sources in a temporary directory. The this @@ -1455,8 +1491,7 @@ class Stream(): else: self._move_directory(temp_source_dir.name, location, force) except OSError as e: - raise StreamError("Failed to checkout sources to {}: {}" - .format(location, e)) from e + raise StreamError("Failed to checkout sources to {}: {}".format(location, e)) from e finally: with suppress(FileNotFoundError): temp_source_dir.cleanup() @@ -1498,10 +1533,10 @@ class Stream(): # Create a tarball from the content of directory def _create_tarball(self, directory, tar_name, compression): if compression is None: - compression = '' + compression = "" mode = _handle_compression(compression) try: - with utils.save_file_atomic(tar_name, mode='wb') as f: + with utils.save_file_atomic(tar_name, mode="wb") as f: tarball = tarfile.open(fileobj=f, mode=mode) for item in os.listdir(str(directory)): file_to_add = os.path.join(directory, item) @@ -1598,7 +1633,7 @@ class Stream(): artifact_globs = [] for target in targets: - if target.endswith('.bst'): + if target.endswith(".bst"): if any(c in "*?[" for c in target): element_globs.append(target) else: @@ -1628,7 +1663,7 @@ class Stream(): for glob in artifact_globs: artifact_refs.extend(self._artifacts.list_artifacts(glob=glob)) if not artifact_refs: - self._message(MessageType.WARN, "No artifacts found for globs: {}".format(', '.join(artifact_globs))) + self._message(MessageType.WARN, "No artifacts found for globs: {}".format(", ".join(artifact_globs))) return element_targets, artifact_refs @@ -1648,8 +1683,7 @@ class Stream(): elif notification.notification_type == NotificationType.JOB_COMPLETE: self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: - self._state.fail_task(notification.job_action, notification.full_name, - notification.element) + self._state.fail_task(notification.job_action, notification.full_name, notification.element) elif notification.notification_type == NotificationType.SCHED_START_TIME: self._starttime = notification.time elif notification.notification_type == NotificationType.RUNNING: @@ -1694,5 +1728,5 @@ class Stream(): # (str): The tarfile mode string # def _handle_compression(compression, *, to_stream=False): - mode_prefix = 'w|' if to_stream else 'w:' + mode_prefix = "w|" if to_stream else "w:" return mode_prefix + compression |