summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Ennis <james.ennis@codethink.com>2019-01-15 15:07:15 +0000
committerJames Ennis <james.ennis@codethink.com>2019-01-16 11:02:11 +0000
commit5b06e2dd0a77a5faf2c52558d825ed52d3cb6ed0 (patch)
tree87d0b980f04296f1e330e78f6f9a326a84a57d46
parent6e12c3fe5d1067b97e1850355f4e69cc6905f8d4 (diff)
downloadbuildstream-jennis/push_pull_artifacts.tar.gz
WIP: cli.py: Spawn pull jobs in another process to isolate gRPC callsjennis/push_pull_artifacts
-rw-r--r--buildstream/_frontend/cli.py59
1 files changed, 51 insertions, 8 deletions
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 24012ba79..1752b8f2d 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -1,4 +1,6 @@
+import multiprocessing
import os
+import signal
import sys
from contextlib import ExitStack
from fnmatch import fnmatch
@@ -973,6 +975,9 @@ def artifact_pull(app, artifacts, deps, remote):
none: No dependencies, just the element itself
all: All dependencies
"""
+ from .. import _signals
+ from .. import utils
+ from .._exceptions import CASError
with app.initialized(session_name="Pull"):
cache = app.context.artifactcache
@@ -1016,14 +1021,52 @@ def artifact_pull(app, artifacts, deps, remote):
# Pull buildtrees?
excluded_subdirs = ["buildtree"] if app.context.pull_buildtrees else None
- # Try to pull the artifact from one of the remotes
- remotes = [cache.create_remote(spec) for spec in remotes]
- for ref in artifacts:
- if cache.contains_ref(ref):
- continue
- for remote in remotes:
- if cache.pull_ref(ref, remote, exclude_subdirs=exclude_subdirs):
- break
+ # Define function to be used by multiprocessing
+ def _pull_artifact(artifact_cache_obj, remote_specs, refs, excluded_subdirs, queue):
+ remotes = [artifact_cache_obj.create_remote(spec) for spec in remote_specs]
+ pull_attempts = {'skipped': [], 'pulled': [], 'failed': []}
+ for ref in refs:
+ if artifact_cache_obj.contains_ref(ref):
+ pull_attempts['skipped'].append(ref)
+ continue
+
+ pulled = False
+ for remote in remotes:
+ try:
+ if artifact_cache_obj.pull_ref(ref, remote, excluded_subdirs=excluded_subdirs):
+ pull_attempts['pulled'].append((ref, remote.spec.url))
+ break
+ except CASError as e:
+ queue.put(e)
+ raise
+ if not pulled:
+ pull_attempts['failed'].append(ref)
+
+ queue.put(pull_attempts)
+
+ q = multiprocessing.Queue()
+ p = multiprocessing.Process(target=_pull_artifact, args=(cache, remotes, artifacts, excluded_subdirs, q))
+ try:
+ with _signals.blocked([signal.SIGINT], ignore=False):
+ p.start()
+
+ result = q.get()
+ p.join()
+ except KeyboardInterrupt:
+ utils._kill_process_tree(p.pid)
+ raise
+
+ # Output to user
+ if isinstance(result, dict):
+ for ref in result['skipped']:
+ click.echo("'{}' already available - pull skipped.".format(ref))
+ for ref, remote in result['pulled']:
+ click.echo("'{}' pulled from '{}'.".format(ref, remote))
+ for ref in result['failed']:
+ urls = [remote.url for remote in remotes]
+ click.echo("'{}' not available in remotes: {}".format(ref, urls)) # Ugly list print
+ else:
+ raise result
##################################################################