summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValentin David <valentin.david@codethink.co.uk>2020-04-16 16:56:27 +0200
committerValentin David <valentin.david@codethink.co.uk>2020-04-16 16:56:27 +0200
commit4f68663ee818b0a35dc59992edfb59292740d23f (patch)
treee94d344a2943f6376cec6f9feca2ec0c73de7b0c
parent8c7c460f523123962d60ff041412591973de30b3 (diff)
downloadbuildstream-valentindavid/bst-1/retry-cas-calls.tar.gz
Handle grpc errors of type UNAVAILABLE and ABORTEDvalentindavid/bst-1/retry-cas-calls
Requests with error UNAVAILABLE are retried directly. They are typically issue with setting connection which can happen when the server is slow at accepting new connections. Requests with errors ABORTED are converted to temporary exceptions so that the scheduler can retry the task. Fixes #815
-rw-r--r--buildstream/_artifactcache/cascache.py84
1 files changed, 73 insertions, 11 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index b40b07409..4f0d10da5 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -49,6 +49,46 @@ from . import ArtifactCache
_MAX_PAYLOAD_BYTES = 1024 * 1024
+class _Attempt():
+
+ def __init__(self, last_attempt=False):
+ self.__passed = None
+ self.__last_attempt = last_attempt
+
+ def passed(self):
+ return self.__passed
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ try:
+ if exc_type is None:
+ self.__passed = True
+ else:
+ self.__passed = False
+ if exc_value is not None:
+ raise exc_value
+ except grpc.RpcError as e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ return not self.__last_attempt
+ elif e.code() == grpc.StatusCode.ABORTED:
+ raise CASRemoteError("grpc aborted: {}".format(str(e)),
+ detail=e.details(),
+ temporary=True) from e
+ else:
+ return False
+ return False
+
+
+def _retry(tries=5):
+ for a in range(tries):
+ attempt = _Attempt(last_attempt=(a == tries - 1))
+ yield attempt
+ if attempt.passed():
+ break
+
+
class BlobNotFound(ArtifactError):
def __init__(self, blob, msg):
@@ -248,7 +288,9 @@ class CASCache(ArtifactCache):
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
- response = remote.ref_storage.GetReference(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
tree = remote_execution_pb2.Digest()
tree.hash = response.digest.hash
@@ -296,7 +338,9 @@ class CASCache(ArtifactCache):
try:
request = buildstream_pb2.GetReferenceRequest()
request.key = ref
- response = remote.ref_storage.GetReference(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.GetReference(request)
if response.digest.hash == tree.hash and response.digest.size_bytes == tree.size_bytes:
# ref is already on the server with the same tree
@@ -313,7 +357,9 @@ class CASCache(ArtifactCache):
request.keys.append(ref)
request.digest.hash = tree.hash
request.digest.size_bytes = tree.size_bytes
- remote.ref_storage.UpdateReference(request)
+ for attempt in _retry():
+ with attempt:
+ remote.ref_storage.UpdateReference(request)
skipped_remote = False
except grpc.RpcError as e:
@@ -786,7 +832,9 @@ class CASCache(ArtifactCache):
remote.init()
request = buildstream_pb2.StatusRequest()
- response = remote.ref_storage.Status(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.ref_storage.Status(request)
if remote_spec.push and not response.allow_updates:
q.put('Artifact server does not allow push')
@@ -986,7 +1034,9 @@ class CASCache(ArtifactCache):
offset += chunk_size
finished = request.finish_write
- response = remote.bytestream.Write(request_stream(resource_name, stream))
+ for attempt in _retry():
+ with attempt:
+ response = remote.bytestream.Write(request_stream(resource_name, stream))
assert response.committed_size == digest.size_bytes
@@ -1003,7 +1053,9 @@ class CASCache(ArtifactCache):
d.hash = required_digest.hash
d.size_bytes = required_digest.size_bytes
- response = remote.cas.FindMissingBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = remote.cas.FindMissingBlobs(request)
for missing_digest in response.missing_blob_digests:
d = remote_execution_pb2.Digest()
d.hash = missing_digest.hash
@@ -1089,7 +1141,9 @@ class _CASRemote():
self.max_batch_total_size_bytes = _MAX_PAYLOAD_BYTES
try:
request = remote_execution_pb2.GetCapabilitiesRequest()
- response = self.capabilities.GetCapabilities(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.capabilities.GetCapabilities(request)
server_max_batch_total_size_bytes = response.cache_capabilities.max_batch_total_size_bytes
if 0 < server_max_batch_total_size_bytes < self.max_batch_total_size_bytes:
self.max_batch_total_size_bytes = server_max_batch_total_size_bytes
@@ -1102,7 +1156,9 @@ class _CASRemote():
self.batch_read_supported = False
try:
request = remote_execution_pb2.BatchReadBlobsRequest()
- response = self.cas.BatchReadBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.cas.BatchReadBlobs(request)
self.batch_read_supported = True
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.UNIMPLEMENTED:
@@ -1112,7 +1168,9 @@ class _CASRemote():
self.batch_update_supported = False
try:
request = remote_execution_pb2.BatchUpdateBlobsRequest()
- response = self.cas.BatchUpdateBlobs(request)
+ for attempt in _retry():
+ with attempt:
+ response = self.cas.BatchUpdateBlobs(request)
self.batch_update_supported = True
except grpc.RpcError as e:
if (e.code() != grpc.StatusCode.UNIMPLEMENTED and
@@ -1153,7 +1211,9 @@ class _CASBatchRead():
if len(self._request.digests) == 0:
return
- batch_response = self._remote.cas.BatchReadBlobs(self._request)
+ for attempt in _retry():
+ with attempt:
+ batch_response = self._remote.cas.BatchReadBlobs(self._request)
for response in batch_response.responses:
if response.status.code == grpc.StatusCode.NOT_FOUND.value[0]:
@@ -1201,7 +1261,9 @@ class _CASBatchUpdate():
if len(self._request.requests) == 0:
return
- batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
+ for attempt in _retry():
+ with attempt:
+ batch_response = self._remote.cas.BatchUpdateBlobs(self._request)
for response in batch_response.responses:
if response.status.code != grpc.StatusCode.OK.value[0]: