summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2018-08-15 14:13:08 +0000
committerJürg Billeter <j@bitron.ch>2018-08-15 14:13:08 +0000
commit007624429f22972a9d2d2620cbbbad18411ff4c9 (patch)
treea8958e438a2d5c4ef802fcc0f0fd55ff20df6684
parent76f34a633e43790eab6592c5f1385f00c5ba2e83 (diff)
parent6a9d737e56077ba735b83fc94040e5707ce10d84 (diff)
downloadbuildstream-007624429f22972a9d2d2620cbbbad18411ff4c9.tar.gz
Merge branch 'juerg/cas' into 'master'
CAS: Fix resource_name format for blobs Closes #572 See merge request BuildStream/buildstream!660
-rw-r--r--buildstream/_artifactcache/cascache.py8
-rw-r--r--buildstream/_artifactcache/casserver.py87
2 files changed, 78 insertions, 17 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 4fea98626..00d09773c 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -24,6 +24,7 @@ import os
import signal
import stat
import tempfile
+import uuid
from urllib.parse import urlparse
import grpc
@@ -309,8 +310,11 @@ class CASCache(ArtifactCache):
# Upload any blobs missing on the server
skipped_remote = False
for digest in missing_blobs.values():
+ uuid_ = uuid.uuid4()
+ resource_name = '/'.join(['uploads', str(uuid_), 'blobs',
+ digest.hash, str(digest.size_bytes)])
+
def request_stream():
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
with open(self.objpath(digest), 'rb') as f:
assert os.fstat(f.fileno()).st_size == digest.size_bytes
offset = 0
@@ -747,7 +751,7 @@ class CASCache(ArtifactCache):
yield from self._required_blobs(dirnode.digest)
def _fetch_blob(self, remote, digest, out):
- resource_name = os.path.join(digest.hash, str(digest.size_bytes))
+ resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
request = bytestream_pb2.ReadRequest()
request.resource_name = resource_name
request.read_offset = 0
diff --git a/buildstream/_artifactcache/casserver.py b/buildstream/_artifactcache/casserver.py
index 73e1ac67a..0af65729b 100644
--- a/buildstream/_artifactcache/casserver.py
+++ b/buildstream/_artifactcache/casserver.py
@@ -23,6 +23,7 @@ import os
import signal
import sys
import tempfile
+import uuid
import click
import grpc
@@ -130,12 +131,21 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
def Read(self, request, context):
resource_name = request.resource_name
- client_digest = _digest_from_resource_name(resource_name)
- assert request.read_offset <= client_digest.size_bytes
+ client_digest = _digest_from_download_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
+ if request.read_offset > client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.OUT_OF_RANGE)
+ return
try:
with open(self.cas.objpath(client_digest), 'rb') as f:
- assert os.fstat(f.fileno()).st_size == client_digest.size_bytes
+ if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return
+
if request.read_offset > 0:
f.seek(request.read_offset)
@@ -163,12 +173,18 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
resource_name = None
with tempfile.NamedTemporaryFile(dir=self.cas.tmpdir) as out:
for request in request_iterator:
- assert not finished
- assert request.write_offset == offset
+ if finished or request.write_offset != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
+
if resource_name is None:
# First request
resource_name = request.resource_name
- client_digest = _digest_from_resource_name(resource_name)
+ client_digest = _digest_from_upload_resource_name(resource_name)
+ if client_digest is None:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ return response
+
try:
_clean_up_cache(self.cas, client_digest.size_bytes)
except ArtifactTooLargeException as e:
@@ -177,14 +193,20 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
return response
elif request.resource_name:
# If it is set on subsequent calls, it **must** match the value of the first request.
- assert request.resource_name == resource_name
+ if request.resource_name != resource_name:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
out.write(request.data)
offset += len(request.data)
if request.finish_write:
- assert client_digest.size_bytes == offset
+ if client_digest.size_bytes != offset:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
out.flush()
digest = self.cas.add_object(path=out.name)
- assert digest.hash == client_digest.hash
+ if digest.hash != client_digest.hash:
+ context.set_code(grpc.StatusCode.FAILED_PRECONDITION)
+ return response
finished = True
assert finished
@@ -247,13 +269,48 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
-def _digest_from_resource_name(resource_name):
+def _digest_from_download_resource_name(resource_name):
+ parts = resource_name.split('/')
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'blobs')
+
+ if len(parts) != 3 or parts[0] != 'blobs':
+ return None
+
+ try:
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[1]
+ digest.size_bytes = int(parts[2])
+ return digest
+ except ValueError:
+ return None
+
+
+def _digest_from_upload_resource_name(resource_name):
parts = resource_name.split('/')
- assert len(parts) == 2
- digest = remote_execution_pb2.Digest()
- digest.hash = parts[0]
- digest.size_bytes = int(parts[1])
- return digest
+
+ # Accept requests from non-conforming BuildStream 1.1.x clients
+ if len(parts) == 2:
+ parts.insert(0, 'uploads')
+ parts.insert(1, str(uuid.uuid4()))
+ parts.insert(2, 'blobs')
+
+ if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
+ return None
+
+ try:
+ uuid_ = uuid.UUID(hex=parts[1])
+ if uuid_.version != 4:
+ return None
+
+ digest = remote_execution_pb2.Digest()
+ digest.hash = parts[3]
+ digest.size_bytes = int(parts[4])
+ return digest
+ except ValueError:
+ return None
def _has_object(cas, digest):