diff options
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 84 |
1 files changed, 73 insertions, 11 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index b40b07409..4f0d10da5 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -49,6 +49,46 @@ from . import ArtifactCache _MAX_PAYLOAD_BYTES = 1024 * 1024 +class _Attempt(): + + def __init__(self, last_attempt=False): + self.__passed = None + self.__last_attempt = last_attempt + + def passed(self): + return self.__passed + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_value, traceback): + try: + if exc_type is None: + self.__passed = True + else: + self.__passed = False + if exc_value is not None: + raise exc_value + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNAVAILABLE: + return not self.__last_attempt + elif e.code() == grpc.StatusCode.ABORTED: + raise CASRemoteError("grpc aborted: {}".format(str(e)), + detail=e.details(), + temporary=True) from e + else: + return False + return False + + +def _retry(tries=5): + for a in range(tries): + attempt = _Attempt(last_attempt=(a == tries - 1)) + yield attempt + if attempt.passed(): + break + + class BlobNotFound(ArtifactError): def __init__(self, blob, msg): @@ -248,7 +288,9 @@ class CASCache(ArtifactCache): request = buildstream_pb2.GetReferenceRequest() request.key = ref - response = remote.ref_storage.GetReference(request) + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) tree = remote_execution_pb2.Digest() tree.hash = response.digest.hash @@ -296,7 +338,9 @@ class CASCache(ArtifactCache): try: request = buildstream_pb2.GetReferenceRequest() request.key = ref - response = remote.ref_storage.GetReference(request) + for attempt in _retry(): + with attempt: + response = remote.ref_storage.GetReference(request) if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes: # ref is already on the server with the same tree @@ -313,7 +357,9 @@ class CASCache(ArtifactCache): request.keys.append(ref) request.digest.hash = tree.hash request.digest.size_bytes = tree.size_bytes - remote.ref_storage.UpdateReference(request) + for attempt in _retry(): + with attempt: + remote.ref_storage.UpdateReference(request) skipped_remote = False except grpc.RpcError as e: @@ -786,7 +832,9 @@ class CASCache(ArtifactCache): remote.init() request = buildstream_pb2.StatusRequest() - response = remote.ref_storage.Status(request) + for attempt in _retry(): + with attempt: + response = remote.ref_storage.Status(request) if remote_spec.push and not response.allow_updates: q.put('Artifact server does not allow push') @@ -986,7 +1034,9 @@ class CASCache(ArtifactCache): offset += chunk_size finished = request.finish_write - response = remote.bytestream.Write(request_stream(resource_name, stream)) + for attempt in _retry(): + with attempt: + response = remote.bytestream.Write(request_stream(resource_name, stream)) assert response.committed_size == digest.size_bytes @@ -1003,7 +1053,9 @@ class CASCache(ArtifactCache): d.hash = required_digest.hash d.size_bytes = required_digest.size_bytes - response = remote.cas.FindMissingBlobs(request) + for attempt in _retry(): + with attempt: + response = remote.cas.FindMissingBlobs(request) for missing_digest in response.missing_blob_digests: d = remote_execution_pb2.Digest() d.hash = missing_digest.hash @@ -1089,7 +1141,9 @@ class _CASRemote(): self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES try: request = remote_execution_pb2.GetCapabilitiesRequest() - response = self.capabilities.GetCapabilities(request) + for attempt in _retry(): + with attempt: + response = self.capabilities.GetCapabilities(request) server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes: self.max_batch_total_size_bytes = server_max_batch_total_size_bytes @@ -1102,7 +1156,9 @@ class _CASRemote(): self.batch_read_supported = False try: request = remote_execution_pb2.BatchReadBlobsRequest() - response = self.cas.BatchReadBlobs(request) + for attempt in _retry(): + with attempt: + response = self.cas.BatchReadBlobs(request) self.batch_read_supported = True except grpc.RpcError as e: if e.code() != grpc.StatusCode.UNIMPLEMENTED: @@ -1112,7 +1168,9 @@ class _CASRemote(): self.batch_update_supported = False try: request = remote_execution_pb2.BatchUpdateBlobsRequest() - response = self.cas.BatchUpdateBlobs(request) + for attempt in _retry(): + with attempt: + response = self.cas.BatchUpdateBlobs(request) self.batch_update_supported = True except grpc.RpcError as e: if (e.code() != grpc.StatusCode.UNIMPLEMENTED and @@ -1153,7 +1211,9 @@ class _CASBatchRead(): if len(self._request.digests) == 0: return - batch_response = self._remote.cas.BatchReadBlobs(self._request) + for attempt in _retry(): + with attempt: + batch_response = self._remote.cas.BatchReadBlobs(self._request) for response in batch_response.responses: if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]: @@ -1201,7 +1261,9 @@ class _CASBatchUpdate(): if len(self._request.requests) == 0: return - batch_response = self._remote.cas.BatchUpdateBlobs(self._request) + for attempt in _retry(): + with attempt: + batch_response = self._remote.cas.BatchUpdateBlobs(self._request) for response in batch_response.responses: if response.status.code != grpc.StatusCode.OK.value[0]: |