summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas/casserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_cas/casserver.py')
-rw-r--r--src/buildstream/_cas/casserver.py140
1 files changed, 70 insertions, 70 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index d4241435a..a2110d8a2 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -33,8 +33,14 @@ import click
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.google.rpc import code_pb2
-from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
- artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
+from .._protos.buildstream.v2 import (
+ buildstream_pb2,
+ buildstream_pb2_grpc,
+ artifact_pb2,
+ artifact_pb2_grpc,
+ source_pb2,
+ source_pb2_grpc,
+)
from .. import utils
from .._exceptions import CASError, CASCacheError
@@ -61,8 +67,8 @@ def create_server(repo, *, enable_push, quota, index_only):
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
try:
- artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
- sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
+ artifactdir = os.path.join(os.path.abspath(repo), "artifacts", "refs")
+ sourcedir = os.path.join(os.path.abspath(repo), "source_protos")
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
@@ -70,31 +76,31 @@ def create_server(repo, *, enable_push, quota, index_only):
if not index_only:
bytestream_pb2_grpc.add_ByteStreamServicer_to_server(
- _ByteStreamServicer(cas, enable_push=enable_push), server)
+ _ByteStreamServicer(cas, enable_push=enable_push), server
+ )
remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
- _ContentAddressableStorageServicer(cas, enable_push=enable_push), server)
+ _ContentAddressableStorageServicer(cas, enable_push=enable_push), server
+ )
- remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(
- _CapabilitiesServicer(), server)
+ remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server)
buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server(
- _ReferenceStorageServicer(cas, enable_push=enable_push), server)
+ _ReferenceStorageServicer(cas, enable_push=enable_push), server
+ )
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
- _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server)
+ _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server
+ )
- source_pb2_grpc.add_SourceServiceServicer_to_server(
- _SourceServicer(sourcedir), server)
+ source_pb2_grpc.add_SourceServiceServicer_to_server(_SourceServicer(sourcedir), server)
# Create up reference storage and artifact capabilities
- artifact_capabilities = buildstream_pb2.ArtifactCapabilities(
- allow_updates=enable_push)
- source_capabilities = buildstream_pb2.SourceCapabilities(
- allow_updates=enable_push)
+ artifact_capabilities = buildstream_pb2.ArtifactCapabilities(allow_updates=enable_push)
+ source_capabilities = buildstream_pb2.SourceCapabilities(allow_updates=enable_push)
buildstream_pb2_grpc.add_CapabilitiesServicer_to_server(
- _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities),
- server)
+ _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), server
+ )
yield server
@@ -103,28 +109,25 @@ def create_server(repo, *, enable_push, quota, index_only):
@click.command(short_help="CAS Artifact Server")
-@click.option('--port', '-p', type=click.INT, required=True, help="Port number")
-@click.option('--server-key', help="Private server key for TLS (PEM-encoded)")
-@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)")
-@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)")
-@click.option('--enable-push', is_flag=True,
- help="Allow clients to upload blobs and update artifact cache")
-@click.option('--quota', type=click.INT, default=10e9, show_default=True,
- help="Maximum disk usage in bytes")
-@click.option('--index-only', is_flag=True,
- help="Only provide the BuildStream artifact and source services (\"index\"), not the CAS (\"storage\")")
-@click.argument('repo')
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
- quota, index_only):
+@click.option("--port", "-p", type=click.INT, required=True, help="Port number")
+@click.option("--server-key", help="Private server key for TLS (PEM-encoded)")
+@click.option("--server-cert", help="Public server certificate for TLS (PEM-encoded)")
+@click.option("--client-certs", help="Public client certificates for TLS (PEM-encoded)")
+@click.option("--enable-push", is_flag=True, help="Allow clients to upload blobs and update artifact cache")
+@click.option("--quota", type=click.INT, default=10e9, show_default=True, help="Maximum disk usage in bytes")
+@click.option(
+ "--index-only",
+ is_flag=True,
+ help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")',
+)
+@click.argument("repo")
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only):
# Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
# properly executing cleanup code in `finally` clauses and context managers.
# This is required to terminate buildbox-casd on SIGTERM.
signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
- with create_server(repo,
- quota=quota,
- enable_push=enable_push,
- index_only=index_only) as server:
+ with create_server(repo, quota=quota, enable_push=enable_push, index_only=index_only) as server:
use_tls = bool(server_key)
@@ -138,23 +141,25 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push,
if use_tls:
# Read public/private key pair
- with open(server_key, 'rb') as f:
+ with open(server_key, "rb") as f:
server_key_bytes = f.read()
- with open(server_cert, 'rb') as f:
+ with open(server_cert, "rb") as f:
server_cert_bytes = f.read()
if client_certs:
- with open(client_certs, 'rb') as f:
+ with open(client_certs, "rb") as f:
client_certs_bytes = f.read()
else:
client_certs_bytes = None
- credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)],
- root_certificates=client_certs_bytes,
- require_client_auth=bool(client_certs))
- server.add_secure_port('[::]:{}'.format(port), credentials)
+ credentials = grpc.ssl_server_credentials(
+ [(server_key_bytes, server_cert_bytes)],
+ root_certificates=client_certs_bytes,
+ require_client_auth=bool(client_certs),
+ )
+ server.add_secure_port("[::]:{}".format(port), credentials)
else:
- server.add_insecure_port('[::]:{}'.format(port))
+ server.add_insecure_port("[::]:{}".format(port))
# Run artifact server
server.start()
@@ -183,7 +188,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
return
try:
- with open(self.cas.objpath(client_digest), 'rb') as f:
+ with open(self.cas.objpath(client_digest), "rb") as f:
if os.fstat(f.fileno()).st_size != client_digest.size_bytes:
context.set_code(grpc.StatusCode.NOT_FOUND)
return
@@ -317,7 +322,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
blob_response.digest.size_bytes = digest.size_bytes
try:
objpath = self.cas.objpath(digest)
- with open(objpath, 'rb') as f:
+ with open(objpath, "rb") as f:
if os.fstat(f.fileno()).st_size != digest.size_bytes:
blob_response.status.code = code_pb2.NOT_FOUND
continue
@@ -437,7 +442,6 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
-
def __init__(self, cas, artifactdir, *, update_cas=True):
super().__init__()
self.cas = cas
@@ -451,7 +455,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
artifact = artifact_pb2.Artifact()
- with open(artifact_path, 'rb') as f:
+ with open(artifact_path, "rb") as f:
artifact.ParseFromString(f.read())
# Artifact-only servers will not have blobs on their system,
@@ -489,11 +493,9 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
except FileNotFoundError:
os.unlink(artifact_path)
- context.abort(grpc.StatusCode.NOT_FOUND,
- "Artifact files incomplete")
+ context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files incomplete")
except DecodeError:
- context.abort(grpc.StatusCode.NOT_FOUND,
- "Artifact files not valid")
+ context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files not valid")
return artifact
@@ -516,7 +518,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
# Add the artifact proto to the cas
artifact_path = os.path.join(self.artifactdir, request.cache_key)
os.makedirs(os.path.dirname(artifact_path), exist_ok=True)
- with utils.save_file_atomic(artifact_path, mode='wb') as f:
+ with utils.save_file_atomic(artifact_path, mode="wb") as f:
f.write(artifact.SerializeToString())
return artifact
@@ -527,19 +529,18 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
def _check_directory(self, name, digest, context):
try:
directory = remote_execution_pb2.Directory()
- with open(self.cas.objpath(digest), 'rb') as f:
+ with open(self.cas.objpath(digest), "rb") as f:
directory.ParseFromString(f.read())
except FileNotFoundError:
- context.abort(grpc.StatusCode.FAILED_PRECONDITION,
- "Artifact {} specified but no files found".format(name))
+ context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name))
except DecodeError:
- context.abort(grpc.StatusCode.FAILED_PRECONDITION,
- "Artifact {} specified but directory not found".format(name))
+ context.abort(
+ grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name)
+ )
def _check_file(self, name, digest, context):
if not os.path.exists(self.cas.objpath(digest)):
- context.abort(grpc.StatusCode.FAILED_PRECONDITION,
- "Artifact {} specified but not found".format(name))
+ context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but not found".format(name))
class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer):
@@ -564,8 +565,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
except FileNotFoundError:
context.abort(grpc.StatusCode.NOT_FOUND, "Source not found")
except DecodeError:
- context.abort(grpc.StatusCode.NOT_FOUND,
- "Sources gives invalid directory")
+ context.abort(grpc.StatusCode.NOT_FOUND, "Sources gives invalid directory")
return source_proto
@@ -576,7 +576,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
def _get_source(self, cache_key):
path = os.path.join(self.sourcedir, cache_key)
source_proto = source_pb2.Source()
- with open(path, 'r+b') as f:
+ with open(path, "r+b") as f:
source_proto.ParseFromString(f.read())
os.utime(path)
return source_proto
@@ -584,18 +584,18 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
def _set_source(self, cache_key, source_proto):
path = os.path.join(self.sourcedir, cache_key)
os.makedirs(os.path.dirname(path), exist_ok=True)
- with utils.save_file_atomic(path, 'w+b') as f:
+ with utils.save_file_atomic(path, "w+b") as f:
f.write(source_proto.SerializeToString())
def _digest_from_download_resource_name(resource_name):
- parts = resource_name.split('/')
+ parts = resource_name.split("/")
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
- parts.insert(0, 'blobs')
+ parts.insert(0, "blobs")
- if len(parts) != 3 or parts[0] != 'blobs':
+ if len(parts) != 3 or parts[0] != "blobs":
return None
try:
@@ -608,15 +608,15 @@ def _digest_from_download_resource_name(resource_name):
def _digest_from_upload_resource_name(resource_name):
- parts = resource_name.split('/')
+ parts = resource_name.split("/")
# Accept requests from non-conforming BuildStream 1.1.x clients
if len(parts) == 2:
- parts.insert(0, 'uploads')
+ parts.insert(0, "uploads")
parts.insert(1, str(uuid.uuid4()))
- parts.insert(2, 'blobs')
+ parts.insert(2, "blobs")
- if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs':
+ if len(parts) < 5 or parts[0] != "uploads" or parts[2] != "blobs":
return None
try: