summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Maat <tristan.maat@codethink.co.uk>2019-10-11 15:19:19 +0100
committerTristan Maat <tristan.maat@codethink.co.uk>2019-11-28 14:37:29 +0000
commit39a939c73ac8101f3f8825938f63a06fbc4852be (patch)
tree83eca402ae2518b7cb9947bba885ce8ef7cf9115
parent05aaf3eadff2981fe3c16e6c3a808c37f0ca1187 (diff)
downloadbuildstream-39a939c73ac8101f3f8825938f63a06fbc4852be.tar.gz
casserver.py: Add logging
-rw-r--r--src/buildstream/_cas/casserver.py77
1 files changed, 72 insertions, 5 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index a2110d8a2..06c14601c 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -19,6 +19,8 @@
from concurrent import futures
from contextlib import contextmanager
+from enum import Enum
+import logging
import os
import signal
import sys
@@ -47,12 +49,42 @@ from .._exceptions import CASError, CASCacheError
from .cascache import CASCache
-
# The default limit for gRPC messages is 4 MiB.
# Limit payload to 1 MiB to leave sufficient headroom for metadata.
_MAX_PAYLOAD_BYTES = 1024 * 1024
+# LogLevel():
+#
+# Manage log level choices using click.
+#
+class LogLevel(click.Choice):
+ # Levels():
+ #
+ # Represents the actual buildbox-casd log level.
+ #
+ class Levels(Enum):
+ WARNING = "warning"
+ INFO = "info"
+ TRACE = "trace"
+
+ def __init__(self):
+ super().__init__([m.lower() for m in LogLevel.Levels._member_names_]) # pylint: disable=no-member
+
+ def convert(self, value, param, ctx) -> "LogLevel.Levels":
+ return LogLevel.Levels(super().convert(value, param, ctx))
+
+ @classmethod
+ def get_logging_equivalent(cls, level) -> int:
+ equivalents = {
+ cls.Levels.WARNING: logging.WARNING,
+ cls.Levels.INFO: logging.INFO,
+ cls.Levels.TRACE: logging.DEBUG,
+ }
+
+ return equivalents[level]
+
+
# create_server():
#
# Create gRPC CAS artifact server as specified in the Remote Execution API.
@@ -62,8 +94,14 @@ _MAX_PAYLOAD_BYTES = 1024 * 1024
# enable_push (bool): Whether to allow blob uploads and artifact updates
# index_only (bool): Whether to store CAS blobs or only artifacts
#
-@contextmanager
-def create_server(repo, *, enable_push, quota, index_only):
+@contextlib.contextmanager
+def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Levels.WARNING):
+ logger = logging.getLogger("buildstream._cas.casserver")
+ logger.setLevel(LogLevel.get_logging_equivalent(log_level))
+ handler = logging.StreamHandler(sys.stderr)
+ handler.setFormatter(logging.Formatter(fmt="%(levelname)s: %(funcName)s: %(message)s"))
+ logger.addHandler(handler)
+
cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False)
try:
@@ -120,14 +158,17 @@ def create_server(repo, *, enable_push, quota, index_only):
is_flag=True,
help='Only provide the BuildStream artifact and source services ("index"), not the CAS ("storage")',
)
+@click.option("--log-level", type=LogLevel(), help="The log level to launch with", default="warning")
@click.argument("repo")
-def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only):
+def server_main(repo, port, server_key, server_cert, client_certs, enable_push, quota, index_only, log_level):
# Handle SIGTERM by calling sys.exit(0), which will raise a SystemExit exception,
# properly executing cleanup code in `finally` clauses and context managers.
# This is required to terminate buildbox-casd on SIGTERM.
signal.signal(signal.SIGTERM, lambda signalnum, frame: sys.exit(0))
- with create_server(repo, quota=quota, enable_push=enable_push, index_only=index_only) as server:
+ with create_server(
+ repo, quota=quota, enable_push=enable_push, index_only=index_only, log_level=log_level
+ ) as server:
use_tls = bool(server_key)
@@ -175,8 +216,10 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.logger = logging.getLogger("buildstream._cas.casserver")
def Read(self, request, context):
+ self.logger.debug("Reading %s", request.resource_name)
resource_name = request.resource_name
client_digest = _digest_from_download_resource_name(resource_name)
if client_digest is None:
@@ -211,6 +254,9 @@ class _ByteStreamServicer(bytestream_pb2_grpc.ByteStreamServicer):
context.set_code(grpc.StatusCode.NOT_FOUND)
def Write(self, request_iterator, context):
+ # Note that we can't easily give more information because the
+ # data is stuck in an iterator that will be consumed if read.
+ self.logger.debug("Writing data")
response = bytestream_pb2.WriteResponse()
if not self.enable_push:
@@ -290,8 +336,10 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.logger = logging.getLogger("buildstream._cas.casserver")
def FindMissingBlobs(self, request, context):
+ self.logger.info("Finding '%s'", request.blob_digests)
response = remote_execution_pb2.FindMissingBlobsResponse()
for digest in request.blob_digests:
objpath = self.cas.objpath(digest)
@@ -308,6 +356,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
return response
def BatchReadBlobs(self, request, context):
+ self.logger.info("Reading '%s'", request.digests)
response = remote_execution_pb2.BatchReadBlobsResponse()
batch_size = 0
@@ -336,6 +385,7 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
return response
def BatchUpdateBlobs(self, request, context):
+ self.logger.info("Updating: '%s'", [request.digest for request in request.requests])
response = remote_execution_pb2.BatchUpdateBlobsResponse()
if not self.enable_push:
@@ -380,7 +430,11 @@ class _ContentAddressableStorageServicer(remote_execution_pb2_grpc.ContentAddres
class _CapabilitiesServicer(remote_execution_pb2_grpc.CapabilitiesServicer):
+ def __init__(self):
+ self.logger = logging.getLogger("buildstream._cas.casserver")
+
def GetCapabilities(self, request, context):
+ self.logger.info("Retrieving capabilities")
response = remote_execution_pb2.ServerCapabilities()
cache_capabilities = response.cache_capabilities
@@ -401,8 +455,10 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
super().__init__()
self.cas = cas
self.enable_push = enable_push
+ self.logger = logging.getLogger("buildstream._cas.casserver")
def GetReference(self, request, context):
+ self.logger.debug("'%s'", request.key)
response = buildstream_pb2.GetReferenceResponse()
try:
@@ -422,6 +478,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
def UpdateReference(self, request, context):
+ self.logger.debug("%s -> %s", request.keys, request.digest)
response = buildstream_pb2.UpdateReferenceResponse()
if not self.enable_push:
@@ -434,6 +491,7 @@ class _ReferenceStorageServicer(buildstream_pb2_grpc.ReferenceStorageServicer):
return response
def Status(self, request, context):
+ self.logger.debug("Retrieving status")
response = buildstream_pb2.StatusResponse()
response.allow_updates = self.enable_push
@@ -448,8 +506,10 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
self.artifactdir = artifactdir
self.update_cas = update_cas
os.makedirs(artifactdir, exist_ok=True)
+ self.logger = logging.getLogger("buildstream._cas.casserver")
def GetArtifact(self, request, context):
+ self.logger.info("'%s'", request.cache_key)
artifact_path = os.path.join(self.artifactdir, request.cache_key)
if not os.path.exists(artifact_path):
context.abort(grpc.StatusCode.NOT_FOUND, "Artifact proto not found")
@@ -500,6 +560,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
return artifact
def UpdateArtifact(self, request, context):
+ self.logger.info("'%s' -> '%s'", request.artifact, request.cache_key)
artifact = request.artifact
if self.update_cas:
@@ -524,6 +585,7 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
return artifact
def ArtifactStatus(self, request, context):
+ self.logger.info("Retrieving status")
return artifact_pb2.ArtifactStatusResponse()
def _check_directory(self, name, digest, context):
@@ -532,8 +594,10 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
with open(self.cas.objpath(digest), "rb") as f:
directory.ParseFromString(f.read())
except FileNotFoundError:
+ self.logger.warning("Artifact %s specified but no files found (%s)", name, self.cas.objpath(digest))
context.abort(grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but no files found".format(name))
except DecodeError:
+ self.logger.warning("Artifact %s specified but directory not found (%s)", name, self.cas.objpath(digest))
context.abort(
grpc.StatusCode.FAILED_PRECONDITION, "Artifact {} specified but directory not found".format(name)
)
@@ -558,8 +622,10 @@ class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer
class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
def __init__(self, sourcedir):
self.sourcedir = sourcedir
+ self.logger = logging.getLogger("buildstream._cas.casserver")
def GetSource(self, request, context):
+ self.logger.info("'%s'", request.cache_key)
try:
source_proto = self._get_source(request.cache_key)
except FileNotFoundError:
@@ -570,6 +636,7 @@ class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
return source_proto
def UpdateSource(self, request, context):
+ self.logger.info("'%s' -> '%s'", request.source, request.cache_key)
self._set_source(request.cache_key, request.source)
return request.source