summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-01-03 12:21:58 +0000
committerRaoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>2019-01-15 11:57:02 +0000
commit7329ef562a3df0d81cfeea20fbfb5cbba366b91a (patch)
tree3ae40f346f1244049250d52b9cd1523e19b25d38
parent15db919f6fafd0c2b0abbddde78a084d5b5379a5 (diff)
downloadbuildstream-raoul/802-refactor-artifactcache.tar.gz
artifactcache: implement new push methodsraoul/802-refactor-artifactcache
Similar to the pull methods, this implements a yield_directory_digests methods that iterates over blobs in the local CAS, with the upload_blob sending blobs to a remote and batching them where appropriate. Part of #802
-rw-r--r--buildstream/_artifactcache.py41
-rw-r--r--buildstream/_cas/cascache.py149
-rw-r--r--buildstream/_cas/casremote.py102
-rw-r--r--buildstream/_cas/transfer.py11
-rw-r--r--buildstream/sandbox/_sandboxremote.py4
5 files changed, 166 insertions, 141 deletions
diff --git a/buildstream/_artifactcache.py b/buildstream/_artifactcache.py
index 21db707f1..46280412c 100644
--- a/buildstream/_artifactcache.py
+++ b/buildstream/_artifactcache.py
@@ -29,7 +29,7 @@ from . import utils
from . import _yaml
from ._cas import BlobNotFound, CASRemote, CASRemoteSpec
-from ._cas.transfer import cas_directory_download, cas_tree_download
+from ._cas.transfer import cas_directory_upload, cas_directory_download, cas_tree_download
CACHE_SIZE_FILE = "cache_size"
@@ -608,16 +608,41 @@ class ArtifactCache():
for remote in push_remotes:
remote.init()
+ skipped_remote = True
display_key = element._get_brief_display_key()
element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push(refs, remote):
- element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
+ try:
+ for ref in refs:
+ # Check whether ref is already on the server in which case
+ # there is no need to push the ref
+ root_digest = self.cas.resolve_ref(ref)
+ response = remote.get_reference(ref)
+ if (response is not None and
+ response.hash == root_digest.hash and
+ response.size_bytes == root_digest.size_bytes):
+ element.info("Remote ({}) already has {} cached".format(
+ remote.spec.url, element._get_brief_display_key()))
+ continue
+
+ # upload blobs
+ cas_directory_upload(self.cas, remote, root_digest)
+ remote.update_reference(ref, root_digest)
+
+ skipped_remote = False
+
+ except CASError as e:
+ if str(e.reason) == "StatusCode.RESOURCE_EXHAUSTED":
+ element.warn("Failed to push element to {}: Resource exhuasted"
+ .format(remote.spec.url))
+ continue
+ else:
+ raise ArtifactError("Failed to push refs {}: {}".format(refs, e),
+ temporary=True) from e
+
+ if skipped_remote is False:
pushed = True
- else:
- element.info("Remote ({}) already has {} cached".format(
- remote.spec.url, element._get_brief_display_key()
- ))
+ element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url))
return pushed
@@ -722,7 +747,7 @@ class ArtifactCache():
return
for remote in push_remotes:
- self.cas.push_directory(remote, directory)
+ cas_directory_upload(self.cas, remote, directory.ref)
# push_message():
#
diff --git a/buildstream/_cas/cascache.py b/buildstream/_cas/cascache.py
index e3b0332be..7ea46a090 100644
--- a/buildstream/_cas/cascache.py
+++ b/buildstream/_cas/cascache.py
@@ -18,23 +18,16 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
import hashlib
-import itertools
import os
import stat
import tempfile
-import uuid
import contextlib
-import grpc
-
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
-from .._protos.buildstream.v2 import buildstream_pb2
from .. import utils
from .._exceptions import CASCacheError
-from .casremote import _CASBatchUpdate
-
# A CASCache manages a CAS repository as specified in the Remote Execution API.
#
@@ -196,73 +189,6 @@ class CASCache():
self.set_ref(newref, tree)
- # push():
- #
- # Push committed refs to remote repository.
- #
- # Args:
- # refs (list): The refs to push
- # remote (CASRemote): The remote to push to
- #
- # Returns:
- # (bool): True if any remote was updated, False if no pushes were required
- #
- # Raises:
- # (CASCacheError): if there was an error
- #
- def push(self, refs, remote):
- skipped_remote = True
- try:
- for ref in refs:
- tree = self.resolve_ref(ref)
-
- # Check whether ref is already on the server in which case
- # there is no need to push the ref
- try:
- request = buildstream_pb2.GetReferenceRequest(instance_name=remote.spec.instance_name)
- request.key = ref
- response = remote.ref_storage.GetReference(request)
-
- if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
- # ref is already on the server with the same tree
- continue
-
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.NOT_FOUND:
- # Intentionally re-raise RpcError for outer except block.
- raise
-
- self._send_directory(remote, tree)
-
- request = buildstream_pb2.UpdateReferenceRequest(instance_name=remote.spec.instance_name)
- request.keys.append(ref)
- request.digest.hash = tree.hash
- request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
-
- skipped_remote = False
- except grpc.RpcError as e:
- if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
- raise CASCacheError("Failed to push ref {}: {}".format(refs, e), temporary=True) from e
-
- return not skipped_remote
-
- # push_directory():
- #
- # Push the given virtual directory to a remote.
- #
- # Args:
- # remote (CASRemote): The remote to push to
- # directory (Directory): A virtual directory object to push.
- #
- # Raises:
- # (CASCacheError): if there was an error
- #
- def push_directory(self, remote, directory):
- remote.init()
-
- self._send_directory(remote, directory.ref)
-
# objpath():
#
# Return the path of an object based on its digest.
@@ -534,6 +460,27 @@ class CASCache():
else:
return None
+ def yield_directory_digests(self, directory_digest):
+ # parse directory, and recursively add blobs
+ d = remote_execution_pb2.Digest()
+ d.hash = directory_digest.hash
+ d.size_bytes = directory_digest.size_bytes
+ yield d
+
+ directory = remote_execution_pb2.Directory()
+
+ with open(self.objpath(directory_digest), 'rb') as f:
+ directory.ParseFromString(f.read())
+
+ for filenode in directory.files:
+ d = remote_execution_pb2.Digest()
+ d.hash = filenode.digest.hash
+ d.size_bytes = filenode.digest.size_bytes
+ yield d
+
+ for dirnode in directory.directories:
+ yield from self.yield_directory_digests(dirnode.digest)
+
################################################
# Local Private Methods #
################################################
@@ -722,57 +669,3 @@ class CASCache():
for dirnode in directory.directories:
yield from self._required_blobs(dirnode.digest)
-
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
- required_blobs = self._required_blobs(digest)
-
- missing_blobs = dict()
- # Limit size of FindMissingBlobs request
- for required_blobs_group in _grouper(required_blobs, 512):
- request = remote_execution_pb2.FindMissingBlobsRequest(instance_name=remote.spec.instance_name)
-
- for required_digest in required_blobs_group:
- d = request.blob_digests.add()
- d.hash = required_digest.hash
- d.size_bytes = required_digest.size_bytes
-
- response = remote.cas.FindMissingBlobs(request)
- for missing_digest in response.missing_blob_digests:
- d = remote_execution_pb2.Digest()
- d.hash = missing_digest.hash
- d.size_bytes = missing_digest.size_bytes
- missing_blobs[d.hash] = d
-
- # Upload any blobs missing on the server
- self._send_blobs(remote, missing_blobs.values(), u_uid)
-
- def _send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
- batch = _CASBatchUpdate(remote)
-
- for digest in digests:
- with open(self.objpath(digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == digest.size_bytes
-
- if (digest.size_bytes >= remote.max_batch_total_size_bytes or
- not remote.batch_update_supported):
- # Too large for batch request, upload in independent request.
- remote._send_blob(digest, f, u_uid=u_uid)
- else:
- if not batch.add(digest, f):
- # Not enough space left in batch request.
- # Complete pending batch first.
- batch.send()
- batch = _CASBatchUpdate(remote)
- batch.add(digest, f)
-
- # Send final batch
- batch.send()
-
-
-def _grouper(iterable, n):
- while True:
- try:
- current = next(iterable)
- except StopIteration:
- return
- yield itertools.chain([current], itertools.islice(iterable, n - 1))
diff --git a/buildstream/_cas/casremote.py b/buildstream/_cas/casremote.py
index 0e75b0936..8435230fc 100644
--- a/buildstream/_cas/casremote.py
+++ b/buildstream/_cas/casremote.py
@@ -1,5 +1,6 @@
from collections import namedtuple
import io
+import itertools
import os
import multiprocessing
import signal
@@ -288,6 +289,18 @@ class CASRemote():
else:
return None
+ # update_reference():
+ #
+ # Args:
+ # ref (str): Reference to update
+ # digest (Digest): New digest to update ref with
+ def update_reference(self, ref, digest):
+ request = buildstream_pb2.UpdateReferenceRequest()
+ request.keys.append(ref)
+ request.digest.hash = digest.hash
+ request.digest.size_bytes = digest.size_bytes
+ self.ref_storage.UpdateReference(request)
+
def get_tree_blob(self, tree_digest):
self.init()
f = tempfile.NamedTemporaryFile(dir=self.tmpdir)
@@ -397,6 +410,68 @@ class CASRemote():
while self.__tmp_downloads:
yield self.__tmp_downloads.pop()
+ # upload_blob():
+ #
+ # Push blobs given an iterator over blob files
+ #
+ # Args:
+ # digest (Digest): digest we want to upload
+ # blob_file (str): Name of file location
+ # u_uid (str): Used to identify to the bytestream service
+ #
+ def upload_blob(self, digest, blob_file, u_uid=uuid.uuid4()):
+ with open(blob_file, 'rb') as f:
+ assert os.fstat(f.fileno()).st_size == digest.size_bytes
+
+ if (digest.size_bytes >= self.max_batch_total_size_bytes or
+ not self.batch_update_supported):
+ # Too large for batch request, upload in independent request.
+ self._send_blob(digest, f, u_uid=u_uid)
+ else:
+ if self.__batch_update.add(digest, f) is False:
+ self.__batch_update.send()
+ self.__batch_update = _CASBatchUpdate(self)
+ self.__batch_update.add(digest, f)
+
+ # send_update_batch():
+ #
+ # Sends anything left in the update batch
+ #
+ def send_update_batch(self):
+ # make sure everything is sent
+ self.__batch_update.send()
+ self.__batch_update = _CASBatchUpdate(self)
+
+ # find_missing_blobs()
+ #
+ # Does FindMissingBlobs request to remote
+ #
+ # Args:
+ # required_blobs ([Digest]): list of blobs required
+ #
+ # Returns:
+ # (Dict(Digest)): missing blobs
+ def find_missing_blobs(self, required_blobs):
+ self.init()
+ missing_blobs = dict()
+ # Limit size of FindMissingBlobs request
+ for required_blobs_group in _grouper(required_blobs, 512):
+ request = remote_execution_pb2.FindMissingBlobsRequest()
+
+ for required_digest in required_blobs_group:
+ d = request.blob_digests.add()
+ d.hash = required_digest.hash
+ d.size_bytes = required_digest.size_bytes
+
+ response = self.cas.FindMissingBlobs(request)
+ for missing_digest in response.missing_blob_digests:
+ d = remote_execution_pb2.Digest()
+ d.hash = missing_digest.hash
+ d.size_bytes = missing_digest.size_bytes
+ missing_blobs[d.hash] = d
+
+ return missing_blobs
+
################################################
# Local Private Methods #
################################################
@@ -435,7 +510,10 @@ class CASRemote():
offset += chunk_size
finished = request.finish_write
- response = self.bytestream.Write(request_stream(resource_name, stream))
+ try:
+ response = self.bytestream.Write(request_stream(resource_name, stream))
+ except grpc.RpcError as e:
+ raise CASRemoteError("Failed to upload blob: {}".format(e), reason=e.code())
assert response.committed_size == digest.size_bytes
@@ -449,6 +527,15 @@ class CASRemote():
self.__batch_read = _CASBatchRead(self)
+def _grouper(iterable, n):
+ while True:
+ try:
+ current = next(iterable)
+ except StopIteration:
+ return
+ yield itertools.chain([current], itertools.islice(iterable, n - 1))
+
+
# Represents a batch of blobs queued for fetching.
#
class _CASBatchRead():
@@ -480,7 +567,11 @@ class _CASBatchRead():
if not self._request.digests:
return
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ try:
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ except grpc.RpcError as e:
+ raise CASRemoteError("Failed to read blob batch: {}".format(e),
+ reason=e.code()) from e
for response in batch_response.responses:
if response.status.code == code_pb2.NOT_FOUND:
@@ -528,7 +619,12 @@ class _CASBatchUpdate():
if not self._request.requests:
return
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ # Want to raise a CASRemoteError if
+ try:
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ except grpc.RpcError as e:
+ raise CASRemoteError("Failed to upload blob batch: {}".format(e),
+ reason=e.code()) from e
for response in batch_response.responses:
if response.status.code != code_pb2.OK:
diff --git a/buildstream/_cas/transfer.py b/buildstream/_cas/transfer.py
index 5eaaf0920..c1293c022 100644
--- a/buildstream/_cas/transfer.py
+++ b/buildstream/_cas/transfer.py
@@ -49,3 +49,14 @@ def cas_tree_download(caslocal, casremote, tree_digest):
# get root digest from tree and return that
return _message_digest(tree.root.SerializeToString())
+
+
+def cas_directory_upload(caslocal, casremote, root_digest):
+ required_blobs = caslocal.yield_directory_digests(root_digest)
+ missing_blobs = casremote.find_missing_blobs(required_blobs)
+ for blob in missing_blobs.values():
+ blob_file = caslocal.objpath(blob)
+ casremote.upload_blob(blob, blob_file)
+
+ # send remaining blobs
+ casremote.send_update_batch()
diff --git a/buildstream/sandbox/_sandboxremote.py b/buildstream/sandbox/_sandboxremote.py
index bea175435..baa5aaaaf 100644
--- a/buildstream/sandbox/_sandboxremote.py
+++ b/buildstream/sandbox/_sandboxremote.py
@@ -39,7 +39,7 @@ from .._exceptions import SandboxError
from .. import _yaml
from .._protos.google.longrunning import operations_pb2, operations_pb2_grpc
from .._cas import CASRemote, CASRemoteSpec
-from .._cas.transfer import cas_tree_download
+from .._cas.transfer import cas_tree_download, cas_directory_upload
class RemoteExecutionSpec(namedtuple('RemoteExecutionSpec', 'exec_service storage_service action_service')):
@@ -345,7 +345,7 @@ class SandboxRemote(Sandbox):
# Now, push that key (without necessarily needing a ref) to the remote.
try:
- cascache.push_directory(casremote, upload_vdir)
+ cas_directory_upload(cascache, casremote, upload_vdir.ref)
except grpc.RpcError as e:
raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e