summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-07-02 13:32:12 +0100
committerJürg Billeter <j@bitron.ch>2019-08-20 08:09:52 +0200
commit2c9ecdc2a849a869d82b08899b0d1d67c9745b55 (patch)
tree925ba17e2975a778ba3113f436cb1f733b36fa5c /src/buildstream/_cas
parent2a5e642e042c12a25f12151d5508213cb7f72c8a (diff)
downloadbuildstream-2c9ecdc2a849a869d82b08899b0d1d67c9745b55.tar.gz
casremote.py: Use UploadMissingBlobs in _send_blob()
Diffstat (limited to 'src/buildstream/_cas')
-rw-r--r--src/buildstream/_cas/cascache.py9
-rw-r--r--src/buildstream/_cas/casremote.py49
2 files changed, 20 insertions, 38 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 71040c5ea..d56ba0f55 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -21,7 +21,6 @@ import itertools
import os
import stat
import errno
-import uuid
import contextlib
import shutil
import subprocess
@@ -944,7 +943,7 @@ class CASCache():
# remote (CASRemote): The remote repository to upload to
# digests (list): The Digests of Blobs to upload
#
- def send_blobs(self, remote, digests, u_uid=uuid.uuid4()):
+ def send_blobs(self, remote, digests):
batch = _CASBatchUpdate(remote)
for digest in digests:
@@ -954,7 +953,7 @@ class CASCache():
if (digest.size_bytes >= remote.max_batch_total_size_bytes or
not remote.batch_update_supported):
# Too large for batch request, upload in independent request.
- remote._send_blob(digest, f, u_uid=u_uid)
+ remote._send_blob(digest)
else:
if not batch.add(digest, f):
# Not enough space left in batch request.
@@ -966,11 +965,11 @@ class CASCache():
# Send final batch
batch.send()
- def _send_directory(self, remote, digest, u_uid=uuid.uuid4()):
+ def _send_directory(self, remote, digest):
missing_blobs = self.remote_missing_blobs_for_directory(remote, digest)
# Upload any blobs missing on the server
- self.send_blobs(remote, missing_blobs, u_uid)
+ self.send_blobs(remote, missing_blobs)
def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py
index 229fd4142..cf052a307 100644
--- a/src/buildstream/_cas/casremote.py
+++ b/src/buildstream/_cas/casremote.py
@@ -3,12 +3,11 @@ import os
import multiprocessing
import signal
from urllib.parse import urlparse
-import uuid
import grpc
from .._protos.google.rpc import code_pb2
-from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
+from .._protos.google.bytestream import bytestream_pb2_grpc
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.build.buildgrid import local_cas_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc
@@ -273,37 +272,21 @@ class CASRemote():
raise CASRemoteError("Failed to download blob {}: {}".format(
blob_response.digest.hash, blob_response.status.code))
- def _send_blob(self, digest, stream, u_uid=uuid.uuid4()):
- if self.instance_name:
- resource_name = '/'.join([self.instance_name, 'uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
- else:
- resource_name = '/'.join(['uploads', str(u_uid), 'blobs',
- digest.hash, str(digest.size_bytes)])
-
- def request_stream(resname, instream):
- offset = 0
- finished = False
- remaining = digest.size_bytes
- while not finished:
- chunk_size = min(remaining, _MAX_PAYLOAD_BYTES)
- remaining -= chunk_size
-
- request = bytestream_pb2.WriteRequest()
- request.write_offset = offset
- # max. _MAX_PAYLOAD_BYTES chunks
- request.data = instream.read(chunk_size)
- request.resource_name = resname
- request.finish_write = remaining <= 0
-
- yield request
-
- offset += chunk_size
- finished = request.finish_write
-
- response = self.bytestream.Write(request_stream(resource_name, stream))
-
- assert response.committed_size == digest.size_bytes
+ def _send_blob(self, digest):
+ local_cas = self.cascache._get_local_cas()
+ request = local_cas_pb2.UploadMissingBlobsRequest()
+ request.instance_name = self.local_cas_instance_name
+ request_digest = request.blob_digests.add()
+ request_digest.CopyFrom(digest)
+ response = local_cas.UploadMissingBlobs(request)
+ for blob_response in response.responses:
+ if blob_response.status.code == code_pb2.NOT_FOUND:
+ raise BlobNotFound(blob_response.digest.hash, "Failed to upload blob {}: {}".format(
+ blob_response.digest.hash, blob_response.status.code))
+
+ if blob_response.status.code != code_pb2.OK:
+ raise CASRemoteError("Failed to upload blob {}: {}".format(
+ blob_response.digest.hash, blob_response.status.code))
# Represents a batch of blobs queued for fetching.