summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-11-14 18:27:03 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-11-22 16:52:26 +0000
commit95ee348004266cd92475a002f8bb55e7a705b9d5 (patch)
tree426339a6554572f5aa8c528b2e6cb10ea64cba5c
parent44c9f09a1f6743775ef4fe7cde0e6820be3f4254 (diff)
downloadbuildstream-95ee348004266cd92475a002f8bb55e7a705b9d5.tar.gz
cascache: _CASCacheUsageMonitor has channel
Change _CASCacheUsageMonitor to hold a CASDChannel instead of it's parent CASCache. This means that when in spawn mode, we don't need to pickle the CASCache, only the CASDChannel.
-rw-r--r--src/buildstream/_cas/cascache.py18
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py2
2 files changed, 6 insertions, 14 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index e46396bc0..ac316ea25 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -85,7 +85,7 @@ class CASCache:
)
self._casd_channel = self._casd_process_manager.create_channel()
- self._cache_usage_monitor = _CASCacheUsageMonitor(self)
+ self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
def __getstate__(self):
state = self.__dict__.copy()
@@ -134,14 +134,6 @@ class CASCache:
if not (os.path.isdir(headdir) and os.path.isdir(objdir)):
raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir))
- # has_open_grpc_channels():
- #
- # Return whether there are gRPC channel instances. This is used to safeguard
- # against fork() with open gRPC channels.
- #
- def has_open_grpc_channels(self):
- return self._casd_lazy_connection and not self._casd_lazy_connection.is_closed()
-
# close_grpc_channels():
#
# Close the casd channel if it exists
@@ -977,8 +969,8 @@ class _CASCacheUsage:
# buildbox-casd.
#
class _CASCacheUsageMonitor:
- def __init__(self, cas):
- self.cas = cas
+ def __init__(self, connection):
+ self._connection = connection
# Shared memory (64-bit signed integer) for current disk usage and quota
self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
@@ -986,7 +978,7 @@ class _CASCacheUsageMonitor:
# multiprocessing.Process will fork without exec on Unix.
# This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and not cas.has_open_grpc_channels()
+ assert utils._is_single_threaded() and connection.is_closed()
# Block SIGINT, we don't want to kill the process when we interrupt the frontend
# and this process if very lightweight.
@@ -1018,7 +1010,7 @@ class _CASCacheUsageMonitor:
disk_usage = self._disk_usage
disk_quota = self._disk_quota
- local_cas = self.cas.get_local_cas()
+ local_cas = self._connection.get_local_cas()
while True:
try:
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index d6e89831a..5e15f95d6 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -219,7 +219,7 @@ class CASDChannel:
# is_closed():
#
- # Return whether the channel is closed or not.
+ # Return whether this connection is closed or not.
#
def is_closed(self):
return self._casd_channel is None