diff options
Diffstat (limited to 'src/buildstream/_cas/casserver.py')
-rw-r--r-- | src/buildstream/_cas/casserver.py | 140 |
1 files changed, 70 insertions, 70 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index d4241435a..a2110d8a2 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -33,8 +33,14 @@ import click from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc from .._protos.google.rpc import code_pb2 -from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \ - artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc +from .._protos.buildstream.v2 import ( + buildstream_pb2, + buildstream_pb2_grpc, + artifact_pb2, + artifact_pb2_grpc, + source_pb2, + source_pb2_grpc, +) from .. import utils from .._exceptions import CASError, CASCacheError @@ -61,8 +67,8 @@ def create_server(repo, *, enable_push, quota, index_only): cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: - artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') - sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') + artifactdir = os.path.join(os.path.abspath(repo), "artifacts", "refs") + sourcedir = os.path.join(os.path.abspath(repo), "source_protos") # Use max_workers default from Python 3.5+ max_workers = (os.cpu_count() or 1) * 5 @@ -70,31 +76,31 @@ def create_server(repo, *, enable_push, quota, index_only): if not index_only: bytestream_pb2_grpc.add_ByteStreamServicer_to_server( - _ByteStreamServicer(cas, enable_push=enable_push), server) + _ByteStreamServicer(cas, enable_push=enable_push), server + ) remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server( - _ContentAddressableStorageServicer(cas, enable_push=enable_push), server) + _ContentAddressableStorageServicer(cas, enable_push=enable_push), server + ) - remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server( - _CapabilitiesServicer(), server) + remote_execution_pb2_grpc.add_CapabilitiesServicer_to_server(_CapabilitiesServicer(), server) buildstream_pb2_grpc.add_ReferenceStorageServicer_to_server( - _ReferenceStorageServicer(cas, enable_push=enable_push), server) + _ReferenceStorageServicer(cas, enable_push=enable_push), server + ) artifact_pb2_grpc.add_ArtifactServiceServicer_to_server( - _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server) + _ArtifactServicer(cas, artifactdir, update_cas=not index_only), server + ) - source_pb2_grpc.add_SourceServiceServicer_to_server( - _SourceServicer(sourcedir), server) + source_pb2_grpc.add_SourceServiceServicer_to_server(_SourceServicer(sourcedir), server) # Create up reference storage and artifact capabilities - artifact_capabilities = buildstream_pb2.ArtifactCapabilities( - allow_updates=enable_push) - source_capabilities = buildstream_pb2.SourceCapabilities( - allow_updates=enable_push) + artifact_capabilities = buildstream_pb2.ArtifactCapabilities(allow_updates=enable_push) + source_capabilities = buildstream_pb2.SourceCapabilities(allow_updates=enable_push) buildstream_pb2_grpc.add_CapabilitiesServicer_to_server( - _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), - server) + _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities), server + ) yield server @@ -103,28 +109,25 @@ def create_server(repo, *, enable_push, quota, index_only): @click.command(short_help="CAS Artifact Server") -@click.option('--port', '-p', type=click.INT, required=True, help="Port number") -@click.option('--server-key', help="Private server key for TLS (PEM-encoded)") -@click.option('--server-cert', help="Public server certificate for TLS (PEM-encoded)") -@click.option('--client-certs', help="Public client certificates for TLS (PEM-encoded)") -@click.option('--enable-push', is_flag=True, - help="Allow clients to upload blobs and update artifact cache") -@click.option('--quota', type=click.INT, default=10e9, show_default=True, - help="Maximum disk usage in bytes") -@click.option('--index-only', is_flag=True, - help="Only provide the BuildStream artifact and source services (\"index\"), not the CAS (\"storage\")") -@click.argument('repo') -def server_main(repo, port, server_key, server_cert, client_certs, enable_push, - quota, index_only): +@click.option("--port", "-p", type=click.INT, required=True, help="Port number") +@click.option("--server-key", help="Private server key for TLS (PEM-encoded)") +@click.option("--server-cert", help="Public server certificate for TLS (PEM-encoded)") +@click.option("--client-certs", help="Public client certificates for TLS (PEM-encoded)") +@click.option("--enable-push", is_flag=True, help="Allow clients to upload blobs and update artifact cache") +@click.option("--quota", type=click.INT, default=10e9, show_default=True, help="Maximum disk usage in bytes") +@click.option( + "--index-only", + is_flag=True, + help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")', +) +@click.argument("repo") +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only): # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, # properly executing cleanup code in `finally` clauses and context managers. # This is required to terminate buildbox-casd on SIGTERM. signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) - with create_server(repo, - quota=quota, - enable_push=enable_push, - index_only=index_only) as server: + with create_server(repo, quota=quota, enable_push=enable_push, index_only=index_only) as server: use_tls = bool(server_key) @@ -138,23 +141,25 @@ def server_main(repo, port, server_key, server_cert, client_certs, enable_push, if use_tls: # Read public/private key pair - with open(server_key, 'rb') as f: + with open(server_key, "rb") as f: server_key_bytes = f.read() - with open(server_cert, 'rb') as f: + with open(server_cert, "rb") as f: server_cert_bytes = f.read() if client_certs: - with open(client_certs, 'rb') as f: + with open(client_certs, "rb") as f: client_certs_bytes = f.read() else: client_certs_bytes = None - credentials = grpc.ssl_server_credentials([(server_key_bytes, server_cert_bytes)], - root_certificates=client_certs_bytes, - require_client_auth=bool(client_certs)) - server.add_secure_port('[::]:{}'.format(port), credentials) + credentials = grpc.ssl_server_credentials( + [(server_key_bytes, server_cert_bytes)], + root_certificates=client_certs_bytes, + require_client_auth=bool(client_certs), + ) + server.add_secure_port("[::]:{}".format(port), credentials) else: - server.add_insecure_port('[::]:{}'.format(port)) + server.add_insecure_port("[::]:{}".format(port)) # Run artifact server server.start() @@ -183,7 +188,7 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): return try: - with open(self.cas.objpath(client_digest), 'rb') as f: + with open(self.cas.objpath(client_digest), "rb") as f: if os.fstat(f.fileno()).st_size != client_digest.size_bytes: context.set_code(grpc.StatusCode.NOT_FOUND) return @@ -317,7 +322,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres blob_response.digest.size_bytes = digest.size_bytes try: objpath = self.cas.objpath(digest) - with open(objpath, 'rb') as f: + with open(objpath, "rb") as f: if os.fstat(f.fileno()).st_size != digest.size_bytes: blob_response.status.code = code_pb2.NOT_FOUND continue @@ -437,7 +442,6 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): - def __init__(self, cas, artifactdir, *, update_cas=True): super().__init__() self.cas = cas @@ -451,7 +455,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found") artifact = artifact_pb2.Artifact() - with open(artifact_path, 'rb') as f: + with open(artifact_path, "rb") as f: artifact.ParseFromString(f.read()) # Artifact-only servers will not have blobs on their system, @@ -489,11 +493,9 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): except FileNotFoundError: os.unlink(artifact_path) - context.abort(grpc.StatusCode.NOT_FOUND, - "Artifact files incomplete") + context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files incomplete") except DecodeError: - context.abort(grpc.StatusCode.NOT_FOUND, - "Artifact files not valid") + context.abort(grpc.StatusCode.NOT_FOUND, "Artifact files not valid") return artifact @@ -516,7 +518,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): # Add the artifact proto to the cas artifact_path = os.path.join(self.artifactdir, request.cache_key) os.makedirs(os.path.dirname(artifact_path), exist_ok=True) - with utils.save_file_atomic(artifact_path, mode='wb') as f: + with utils.save_file_atomic(artifact_path, mode="wb") as f: f.write(artifact.SerializeToString()) return artifact @@ -527,19 +529,18 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): def _check_directory(self, name, digest, context): try: directory = remote_execution_pb2.Directory() - with open(self.cas.objpath(digest), 'rb') as f: + with open(self.cas.objpath(digest), "rb") as f: directory.ParseFromString(f.read()) except FileNotFoundError: - context.abort(grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but no files found".format(name)) + context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name)) except DecodeError: - context.abort(grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but directory not found".format(name)) + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name) + ) def _check_file(self, name, digest, context): if not os.path.exists(self.cas.objpath(digest)): - context.abort(grpc.StatusCode.FAILED_PRECONDITION, - "Artifact {} specified but not found".format(name)) + context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but not found".format(name)) class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer): @@ -564,8 +565,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer): except FileNotFoundError: context.abort(grpc.StatusCode.NOT_FOUND, "Source not found") except DecodeError: - context.abort(grpc.StatusCode.NOT_FOUND, - "Sources gives invalid directory") + context.abort(grpc.StatusCode.NOT_FOUND, "Sources gives invalid directory") return source_proto @@ -576,7 +576,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer): def _get_source(self, cache_key): path = os.path.join(self.sourcedir, cache_key) source_proto = source_pb2.Source() - with open(path, 'r+b') as f: + with open(path, "r+b") as f: source_proto.ParseFromString(f.read()) os.utime(path) return source_proto @@ -584,18 +584,18 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer): def _set_source(self, cache_key, source_proto): path = os.path.join(self.sourcedir, cache_key) os.makedirs(os.path.dirname(path), exist_ok=True) - with utils.save_file_atomic(path, 'w+b') as f: + with utils.save_file_atomic(path, "w+b") as f: f.write(source_proto.SerializeToString()) def _digest_from_download_resource_name(resource_name): - parts = resource_name.split('/') + parts = resource_name.split("/") # Accept requests from non-conforming BuildStream 1.1.x clients if len(parts) == 2: - parts.insert(0, 'blobs') + parts.insert(0, "blobs") - if len(parts) != 3 or parts[0] != 'blobs': + if len(parts) != 3 or parts[0] != "blobs": return None try: @@ -608,15 +608,15 @@ def _digest_from_download_resource_name(resource_name): def _digest_from_upload_resource_name(resource_name): - parts = resource_name.split('/') + parts = resource_name.split("/") # Accept requests from non-conforming BuildStream 1.1.x clients if len(parts) == 2: - parts.insert(0, 'uploads') + parts.insert(0, "uploads") parts.insert(1, str(uuid.uuid4())) - parts.insert(2, 'blobs') + parts.insert(2, "blobs") - if len(parts) < 5 or parts[0] != 'uploads' or parts[2] != 'blobs': + if len(parts) < 5 or parts[0] != "uploads" or parts[2] != "blobs": return None try: |