diff options
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r-- | src/buildstream/_cas/cascache.py | 67 |
1 files changed, 22 insertions, 45 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 7936121ea..b4e206370 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -22,11 +22,9 @@ import itertools import os import stat import contextlib -import ctypes -import multiprocessing -import signal import time from typing import Optional, List +import threading import grpc @@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.build.buildgrid import local_cas_pb2 -from .. import _signals, utils +from .. import utils from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError @@ -93,6 +91,7 @@ class CASCache: self._casd_channel = self._casd_process_manager.create_channel() self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) + self._cache_usage_monitor.start() # get_cas(): # @@ -132,7 +131,8 @@ class CASCache: # def release_resources(self, messenger=None): if self._cache_usage_monitor: - self._cache_usage_monitor.release_resources() + self._cache_usage_monitor.stop() + self._cache_usage_monitor.join() if self._casd_process_manager: self.close_grpc_channels() @@ -731,65 +731,42 @@ class _CASCacheUsage: # This manages the subprocess that tracks cache usage information via # buildbox-casd. # -class _CASCacheUsageMonitor: +class _CASCacheUsageMonitor(threading.Thread): def __init__(self, connection): + super().__init__() self._connection = connection - - # Shared memory (64-bit signed integer) for current disk usage and quota - self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1) - self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1) - - # multiprocessing.Process will fork without exec on Unix. - # This can't be allowed with background threads or open gRPC channels. - assert utils._is_single_threaded() and connection.is_closed() - - # Block SIGINT, we don't want to kill the process when we interrupt the frontend - # and this process if very lightweight. - with _signals.blocked([signal.SIGINT], ignore=False): - self._subprocess = multiprocessing.Process(target=self._subprocess_run) - self._subprocess.start() + self._disk_usage = None + self._disk_quota = None + self._should_stop = False def get_cache_usage(self): - disk_usage = self._disk_usage.value - disk_quota = self._disk_quota.value - - if disk_usage < 0: - # Disk usage still unknown - disk_usage = None - - if disk_quota <= 0: - # No disk quota - disk_quota = None - - return _CASCacheUsage(disk_usage, disk_quota) - - def release_resources(self): - # Simply terminate the subprocess, no cleanup required in the subprocess - self._subprocess.terminate() + # FIXME: remove this abstraction + return _CASCacheUsage(self._disk_usage, self._disk_quota) - def _subprocess_run(self): - # Reset SIGTERM in subprocess to default as no cleanup is necessary - signal.signal(signal.SIGTERM, signal.SIG_DFL) + def stop(self): + self._should_stop = True - disk_usage = self._disk_usage - disk_quota = self._disk_quota + def run(self): local_cas = self._connection.get_local_cas() - while True: + while not self._should_stop: try: # Ask buildbox-casd for current value request = local_cas_pb2.GetLocalDiskUsageRequest() response = local_cas.GetLocalDiskUsage(request) # Update values in shared memory - disk_usage.value = response.size_bytes - disk_quota.value = response.quota_bytes + self._disk_usage = response.size_bytes + self._disk_quota = response.quota_bytes except grpc.RpcError: # Terminate loop when buildbox-casd becomes unavailable break # Sleep until next refresh - time.sleep(_CACHE_USAGE_REFRESH) + for _ in range(_CACHE_USAGE_REFRESH * 10): + if self._should_stop: + break + time.sleep(0.1) def _grouper(iterable, n): |