diff options
author | Jürg Billeter <j@bitron.ch> | 2019-07-02 13:32:12 +0100 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2019-08-20 08:09:52 +0200 |
commit | 2c9ecdc2a849a869d82b08899b0d1d67c9745b55 (patch) | |
tree | 925ba17e2975a778ba3113f436cb1f733b36fa5c /src/buildstream/_cas | |
parent | 2a5e642e042c12a25f12151d5508213cb7f72c8a (diff) | |
download | buildstream-2c9ecdc2a849a869d82b08899b0d1d67c9745b55.tar.gz |
casremote.py: Use UploadMissingBlobs in _send_blob()
Diffstat (limited to 'src/buildstream/_cas')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 9 | ||||
-rw-r--r-- | src/buildstream/_cas/casremote.py | 49 |
2 files changed, 20 insertions, 38 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 71040c5ea..d56ba0f55 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -21,7 +21,6 @@ import itertools import os import stat import errno -import uuid import contextlib import shutil import subprocess @@ -944,7 +943,7 @@ class CASCache(): # remote (CASRemote): The remote repository to upload to # digests (list): The Digests of Blobs to upload # - def send_blobs(self, remote, digests, u_uid=uuid.uuid4()): + def send_blobs(self, remote, digests): batch = _CASBatchUpdate(remote) for digest in digests: @@ -954,7 +953,7 @@ class CASCache(): if (digest.size_bytes >= remote.max_batch_total_size_bytes or not remote.batch_update_supported): # Too large for batch request, upload in independent request. - remote._send_blob(digest, f, u_uid=u_uid) + remote._send_blob(digest) else: if not batch.add(digest, f): # Not enough space left in batch request. @@ -966,11 +965,11 @@ class CASCache(): # Send final batch batch.send() - def _send_directory(self, remote, digest, u_uid=uuid.uuid4()): + def _send_directory(self, remote, digest): missing_blobs = self.remote_missing_blobs_for_directory(remote, digest) # Upload any blobs missing on the server - self.send_blobs(remote, missing_blobs, u_uid) + self.send_blobs(remote, missing_blobs) def _grouper(iterable, n): diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 229fd4142..cf052a307 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -3,12 +3,11 @@ import os import multiprocessing import signal from urllib.parse import urlparse -import uuid import grpc from .._protos.google.rpc import code_pb2 -from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc +from .._protos.google.bytestream import bytestream_pb2_grpc from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.build.buildgrid import local_cas_pb2 from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc @@ -273,37 +272,21 @@ class CASRemote(): raise CASRemoteError("Failed to download blob {}: {}".format( blob_response.digest.hash, blob_response.status.code)) - def _send_blob(self, digest, stream, u_uid=uuid.uuid4()): - if self.instance_name: - resource_name = '/'.join([self.instance_name, 'uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) - else: - resource_name = '/'.join(['uploads', str(u_uid), 'blobs', - digest.hash, str(digest.size_bytes)]) - - def request_stream(resname, instream): - offset = 0 - finished = False - remaining = digest.size_bytes - while not finished: - chunk_size = min(remaining, _MAX_PAYLOAD_BYTES) - remaining -= chunk_size - - request = bytestream_pb2.WriteRequest() - request.write_offset = offset - # max. _MAX_PAYLOAD_BYTES chunks - request.data = instream.read(chunk_size) - request.resource_name = resname - request.finish_write = remaining <= 0 - - yield request - - offset += chunk_size - finished = request.finish_write - - response = self.bytestream.Write(request_stream(resource_name, stream)) - - assert response.committed_size == digest.size_bytes + def _send_blob(self, digest): + local_cas = self.cascache._get_local_cas() + request = local_cas_pb2.UploadMissingBlobsRequest() + request.instance_name = self.local_cas_instance_name + request_digest = request.blob_digests.add() + request_digest.CopyFrom(digest) + response = local_cas.UploadMissingBlobs(request) + for blob_response in response.responses: + if blob_response.status.code == code_pb2.NOT_FOUND: + raise BlobNotFound(blob_response.digest.hash, "Failed to upload blob {}: {}".format( + blob_response.digest.hash, blob_response.status.code)) + + if blob_response.status.code != code_pb2.OK: + raise CASRemoteError("Failed to upload blob {}: {}".format( + blob_response.digest.hash, blob_response.status.code)) # Represents a batch of blobs queued for fetching. |