summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_artifactcache/cascache.py84
1 files changed, 73 insertions, 11 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index b40b07409..4f0d10da5 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -49,6 +49,46 @@ from . import ArtifactCache
_MAX_PAYLOAD_BYTES = 1024 * 1024
+class _Attempt():
+
+ def __init__(self, last_attempt=False):
+ self.__passed = None
+ self.__last_attempt = last_attempt
+
+ def passed(self):
+ return self.__passed
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ try:
+ if exc_type is None:
+ self.__passed = True
+ else:
+ self.__passed = False
+ if exc_value is not None:
+ raise exc_value
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ return not self.__last_attempt
+ elif e.code() == grpc.StatusCode.ABORTED:
+ raise CASRemoteError("grpc aborted: {}".format(str(e)),
+ detail=e.details(),
+ temporary=True) from e
+ else:
+ return False
+ return False
+
+
+def _retry(tries=5):
+ for a in range(tries):
+ attempt = _Attempt(last_attempt=(a == tries - 1))
+ yield attempt
+ if attempt.passed():
+ break
+
+
class BlobNotFound(ArtifactError):
def __init__(self, blob, msg):
@@ -248,7 +288,9 @@ class CASCache(ArtifactCache):
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
- response = remote.ref_storage.GetReference(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
tree = remote_execution_pb2.Digest()
tree.hash = response.digest.hash
@@ -296,7 +338,9 @@ class CASCache(ArtifactCache):
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
- response = remote.ref_storage.GetReference(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
@@ -313,7 +357,9 @@ class CASCache(ArtifactCache):
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
+ for attempt in _retry():
+ with attempt:
+ remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
@@ -786,7 +832,9 @@ class CASCache(ArtifactCache):
remote.init()
request = buildstream_pb2.StatusRequest()
- response = remote.ref_storage.Status(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.Status(request)
if remote_spec.push and not response.allow_updates:
q.put('Artifact server does not allow push')
@@ -986,7 +1034,9 @@ class CASCache(ArtifactCache):
offset += chunk_size
finished = request.finish_write
- response = remote.bytestream.Write(request_stream(resource_name, stream))
+ for attempt in _retry():
+ with attempt:
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
assert response.committed_size == digest.size_bytes
@@ -1003,7 +1053,9 @@ class CASCache(ArtifactCache):
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
- response = remote.cas.FindMissingBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
@@ -1089,7 +1141,9 @@ class _CASRemote():
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
request = remote_execution_pb2.GetCapabilitiesRequest()
- response = self.capabilities.GetCapabilities(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
@@ -1102,7 +1156,9 @@ class _CASRemote():
self.batch_read_supported = False
try:
request = remote_execution_pb2.BatchReadBlobsRequest()
- response = self.cas.BatchReadBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
@@ -1112,7 +1168,9 @@ class _CASRemote():
self.batch_update_supported = False
try:
request = remote_execution_pb2.BatchUpdateBlobsRequest()
- response = self.cas.BatchUpdateBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
@@ -1153,7 +1211,9 @@ class _CASBatchRead():
if len(self._request.digests) == 0:
return
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ for attempt in _retry():
+ with attempt:
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
for response in batch_response.responses:
if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]:
@@ -1201,7 +1261,9 @@ class _CASBatchUpdate():
if len(self._request.requests) == 0:
return
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ for attempt in _retry():
+ with attempt:
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]: