summaryrefslogtreecommitdiff
path: root/src/buildstream/_cas/cascache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_cas/cascache.py')
-rw-r--r--src/buildstream/_cas/cascache.py67
1 files changed, 22 insertions, 45 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7936121ea..b4e206370 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -22,11 +22,9 @@ import itertools
import os
import stat
import contextlib
-import ctypes
-import multiprocessing
-import signal
import time
from typing import Optional, List
+import threading
import grpc
@@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.build.buildgrid import local_cas_pb2
-from .. import _signals, utils
+from .. import utils
from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
@@ -93,6 +91,7 @@ class CASCache:
self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
+ self._cache_usage_monitor.start()
# get_cas():
#
@@ -132,7 +131,8 @@ class CASCache:
#
def release_resources(self, messenger=None):
if self._cache_usage_monitor:
- self._cache_usage_monitor.release_resources()
+ self._cache_usage_monitor.stop()
+ self._cache_usage_monitor.join()
if self._casd_process_manager:
self.close_grpc_channels()
@@ -731,65 +731,42 @@ class _CASCacheUsage:
# This manages the subprocess that tracks cache usage information via
# buildbox-casd.
#
-class _CASCacheUsageMonitor:
+class _CASCacheUsageMonitor(threading.Thread):
def __init__(self, connection):
+ super().__init__()
self._connection = connection
-
- # Shared memory (64-bit signed integer) for current disk usage and quota
- self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
- self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
-
- # multiprocessing.Process will fork without exec on Unix.
- # This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and connection.is_closed()
-
- # Block SIGINT, we don't want to kill the process when we interrupt the frontend
- # and this process if very lightweight.
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._subprocess = multiprocessing.Process(target=self._subprocess_run)
- self._subprocess.start()
+ self._disk_usage = None
+ self._disk_quota = None
+ self._should_stop = False
def get_cache_usage(self):
- disk_usage = self._disk_usage.value
- disk_quota = self._disk_quota.value
-
- if disk_usage < 0:
- # Disk usage still unknown
- disk_usage = None
-
- if disk_quota <= 0:
- # No disk quota
- disk_quota = None
-
- return _CASCacheUsage(disk_usage, disk_quota)
-
- def release_resources(self):
- # Simply terminate the subprocess, no cleanup required in the subprocess
- self._subprocess.terminate()
+ # FIXME: remove this abstraction
+ return _CASCacheUsage(self._disk_usage, self._disk_quota)
- def _subprocess_run(self):
- # Reset SIGTERM in subprocess to default as no cleanup is necessary
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ def stop(self):
+ self._should_stop = True
- disk_usage = self._disk_usage
- disk_quota = self._disk_quota
+ def run(self):
local_cas = self._connection.get_local_cas()
- while True:
+ while not self._should_stop:
try:
# Ask buildbox-casd for current value
request = local_cas_pb2.GetLocalDiskUsageRequest()
response = local_cas.GetLocalDiskUsage(request)
# Update values in shared memory
- disk_usage.value = response.size_bytes
- disk_quota.value = response.quota_bytes
+ self._disk_usage = response.size_bytes
+ self._disk_quota = response.quota_bytes
except grpc.RpcError:
# Terminate loop when buildbox-casd becomes unavailable
break
# Sleep until next refresh
- time.sleep(_CACHE_USAGE_REFRESH)
+ for _ in range(_CACHE_USAGE_REFRESH * 10):
+ if self._should_stop:
+ break
+ time.sleep(0.1)
def _grouper(iterable, n):