diff options
author | James Ennis <james.ennis@codethink.com> | 2019-01-15 15:07:15 +0000 |
---|---|---|
committer | James Ennis <james.ennis@codethink.com> | 2019-01-16 11:02:11 +0000 |
commit | 5b06e2dd0a77a5faf2c52558d825ed52d3cb6ed0 (patch) | |
tree | 87d0b980f04296f1e330e78f6f9a326a84a57d46 | |
parent | 6e12c3fe5d1067b97e1850355f4e69cc6905f8d4 (diff) | |
download | buildstream-jennis/push_pull_artifacts.tar.gz |
WIP: cli.py: Spawn pull jobs in another process to isolate gRPC callsjennis/push_pull_artifacts
-rw-r--r-- | buildstream/_frontend/cli.py | 59 |
1 files changed, 51 insertions, 8 deletions
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py index 24012ba79..1752b8f2d 100644 --- a/buildstream/_frontend/cli.py +++ b/buildstream/_frontend/cli.py @@ -1,4 +1,6 @@ +import multiprocessing import os +import signal import sys from contextlib import ExitStack from fnmatch import fnmatch @@ -973,6 +975,9 @@ def artifact_pull(app, artifacts, deps, remote): none: No dependencies, just the element itself all: All dependencies """ + from .. import _signals + from .. import utils + from .._exceptions import CASError with app.initialized(session_name="Pull"): cache = app.context.artifactcache @@ -1016,14 +1021,52 @@ def artifact_pull(app, artifacts, deps, remote): # Pull buildtrees? excluded_subdirs = ["buildtree"] if app.context.pull_buildtrees else None - # Try to pull the artifact from one of the remotes - remotes = [cache.create_remote(spec) for spec in remotes] - for ref in artifacts: - if cache.contains_ref(ref): - continue - for remote in remotes: - if cache.pull_ref(ref, remote, exclude_subdirs=exclude_subdirs): - break + # Define function to be used by multiprocessing + def _pull_artifact(artifact_cache_obj, remote_specs, refs, excluded_subdirs, queue): + remotes = [artifact_cache_obj.create_remote(spec) for spec in remote_specs] + pull_attempts = {'skipped': [], 'pulled': [], 'failed': []} + for ref in refs: + if artifact_cache_obj.contains_ref(ref): + pull_attempts['skipped'].append(ref) + continue + + pulled = False + for remote in remotes: + try: + if artifact_cache_obj.pull_ref(ref, remote, excluded_subdirs=excluded_subdirs): + pull_attempts['pulled'].append((ref, remote.spec.url)) + break + except CASError as e: + queue.put(e) + raise + if not pulled: + pull_attempts['failed'].append(ref) + + queue.put(pull_attempts) + + q = multiprocessing.Queue() + p = multiprocessing.Process(target=_pull_artifact, args=(cache, remotes, artifacts, excluded_subdirs, q)) + try: + with _signals.blocked([signal.SIGINT], ignore=False): + p.start() + + result = q.get() + p.join() + except KeyboardInterrupt: + utils._kill_process_tree(p.pid) + raise + + # Output to user + if isinstance(result, dict): + for ref in result['skipped']: + click.echo("'{}' already available - pull skipped.".format(ref)) + for ref, remote in result['pulled']: + click.echo("'{}' pulled from '{}'.".format(ref, remote)) + for ref in result['failed']: + urls = [remote.url for remote in remotes] + click.echo("'{}' not available in remotes: {}".format(ref, urls)) # Ugly list print + else: + raise result ################################################################## |