diff options
author | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-10-11 15:19:19 +0100 |
---|---|---|
committer | Tristan Maat <tristan.maat@codethink.co.uk> | 2019-11-28 14:37:29 +0000 |
commit | 39a939c73ac8101f3f8825938f63a06fbc4852be (patch) | |
tree | 83eca402ae2518b7cb9947bba885ce8ef7cf9115 | |
parent | 05aaf3eadff2981fe3c16e6c3a808c37f0ca1187 (diff) | |
download | buildstream-39a939c73ac8101f3f8825938f63a06fbc4852be.tar.gz |
casserver.py: Add logging
-rw-r--r-- | src/buildstream/_cas/casserver.py | 77 |
1 files changed, 72 insertions, 5 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index a2110d8a2..06c14601c 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -19,6 +19,8 @@ from concurrent import futures from contextlib import contextmanager +from enum import Enum +import logging import os import signal import sys @@ -47,12 +49,42 @@ from .._exceptions import CASError, CASCacheError from .cascache import CASCache - # The default limit for gRPC messages is 4 MiB. # Limit payload to 1 MiB to leave sufficient headroom for metadata. _MAX_PAYLOAD_BYTES = 1024 * 1024 +# LogLevel(): +# +# Manage log level choices using click. +# +class LogLevel(click.Choice): + # Levels(): + # + # Represents the actual buildbox-casd log level. + # + class Levels(Enum): + WARNING = "warning" + INFO = "info" + TRACE = "trace" + + def __init__(self): + super().__init__([m.lower() for m in LogLevel.Levels._member_names_]) # pylint: disable=no-member + + def convert(self, value, param, ctx) -> "LogLevel.Levels": + return LogLevel.Levels(super().convert(value, param, ctx)) + + @classmethod + def get_logging_equivalent(cls, level) -> int: + equivalents = { + cls.Levels.WARNING: logging.WARNING, + cls.Levels.INFO: logging.INFO, + cls.Levels.TRACE: logging.DEBUG, + } + + return equivalents[level] + + # create_server(): # # Create gRPC CAS artifact server as specified in the Remote Execution API. @@ -62,8 +94,14 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024 # enable_push (bool): Whether to allow blob uploads and artifact updates # index_only (bool): Whether to store CAS blobs or only artifacts # -@contextmanager -def create_server(repo, *, enable_push, quota, index_only): +@contextlib.contextmanager +def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Levels.WARNING): + logger = logging.getLogger("buildstream._cas.casserver") + logger.setLevel(LogLevel.get_logging_equivalent(log_level)) + handler = logging.StreamHandler(sys.stderr) + handler.setFormatter(logging.Formatter(fmt="%(levelname)s: %(funcName)s: %(message)s")) + logger.addHandler(handler) + cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: @@ -120,14 +158,17 @@ def create_server(repo, *, enable_push, quota, index_only): is_flag=True, help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")', ) +@click.option("--log-level", type=LogLevel(), help="The log level to launch with", default="warning") @click.argument("repo") -def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only): +def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only, log_level): # Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception, # properly executing cleanup code in `finally` clauses and context managers. # This is required to terminate buildbox-casd on SIGTERM. signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0)) - with create_server(repo, quota=quota, enable_push=enable_push, index_only=index_only) as server: + with create_server( + repo, quota=quota, enable_push=enable_push, index_only=index_only, log_level=log_level + ) as server: use_tls = bool(server_key) @@ -175,8 +216,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): super().__init__() self.cas = cas self.enable_push = enable_push + self.logger = logging.getLogger("buildstream._cas.casserver") def Read(self, request, context): + self.logger.debug("Reading %s", request.resource_name) resource_name = request.resource_name client_digest = _digest_from_download_resource_name(resource_name) if client_digest is None: @@ -211,6 +254,9 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer): context.set_code(grpc.StatusCode.NOT_FOUND) def Write(self, request_iterator, context): + # Note that we can't easily give more information because the + # data is stuck in an iterator that will be consumed if read. + self.logger.debug("Writing data") response = bytestream_pb2.WriteResponse() if not self.enable_push: @@ -290,8 +336,10 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres super().__init__() self.cas = cas self.enable_push = enable_push + self.logger = logging.getLogger("buildstream._cas.casserver") def FindMissingBlobs(self, request, context): + self.logger.info("Finding '%s'", request.blob_digests) response = remote_execution_pb2.FindMissingBlobsResponse() for digest in request.blob_digests: objpath = self.cas.objpath(digest) @@ -308,6 +356,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response def BatchReadBlobs(self, request, context): + self.logger.info("Reading '%s'", request.digests) response = remote_execution_pb2.BatchReadBlobsResponse() batch_size = 0 @@ -336,6 +385,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres return response def BatchUpdateBlobs(self, request, context): + self.logger.info("Updating: '%s'", [request.digest for request in request.requests]) response = remote_execution_pb2.BatchUpdateBlobsResponse() if not self.enable_push: @@ -380,7 +430,11 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer): + def __init__(self): + self.logger = logging.getLogger("buildstream._cas.casserver") + def GetCapabilities(self, request, context): + self.logger.info("Retrieving capabilities") response = remote_execution_pb2.ServerCapabilities() cache_capabilities = response.cache_capabilities @@ -401,8 +455,10 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): super().__init__() self.cas = cas self.enable_push = enable_push + self.logger = logging.getLogger("buildstream._cas.casserver") def GetReference(self, request, context): + self.logger.debug("'%s'", request.key) response = buildstream_pb2.GetReferenceResponse() try: @@ -422,6 +478,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): return response def UpdateReference(self, request, context): + self.logger.debug("%s -> %s", request.keys, request.digest) response = buildstream_pb2.UpdateReferenceResponse() if not self.enable_push: @@ -434,6 +491,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer): return response def Status(self, request, context): + self.logger.debug("Retrieving status") response = buildstream_pb2.StatusResponse() response.allow_updates = self.enable_push @@ -448,8 +506,10 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): self.artifactdir = artifactdir self.update_cas = update_cas os.makedirs(artifactdir, exist_ok=True) + self.logger = logging.getLogger("buildstream._cas.casserver") def GetArtifact(self, request, context): + self.logger.info("'%s'", request.cache_key) artifact_path = os.path.join(self.artifactdir, request.cache_key) if not os.path.exists(artifact_path): context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found") @@ -500,6 +560,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): return artifact def UpdateArtifact(self, request, context): + self.logger.info("'%s' -> '%s'", request.artifact, request.cache_key) artifact = request.artifact if self.update_cas: @@ -524,6 +585,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): return artifact def ArtifactStatus(self, request, context): + self.logger.info("Retrieving status") return artifact_pb2.ArtifactStatusResponse() def _check_directory(self, name, digest, context): @@ -532,8 +594,10 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer): with open(self.cas.objpath(digest), "rb") as f: directory.ParseFromString(f.read()) except FileNotFoundError: + self.logger.warning("Artifact %s specified but no files found (%s)", name, self.cas.objpath(digest)) context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name)) except DecodeError: + self.logger.warning("Artifact %s specified but directory not found (%s)", name, self.cas.objpath(digest)) context.abort( grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name) ) @@ -558,8 +622,10 @@ class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer class _SourceServicer(source_pb2_grpc.SourceServiceServicer): def __init__(self, sourcedir): self.sourcedir = sourcedir + self.logger = logging.getLogger("buildstream._cas.casserver") def GetSource(self, request, context): + self.logger.info("'%s'", request.cache_key) try: source_proto = self._get_source(request.cache_key) except FileNotFoundError: @@ -570,6 +636,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer): return source_proto def UpdateSource(self, request, context): + self.logger.info("'%s' -> '%s'", request.source, request.cache_key) self._set_source(request.cache_key, request.source) return request.source |