summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-10-15 09:54:57 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-10-15 09:54:57 +0000
commit1d4703b5dbed7bf2f4af0c11c49dfd43f07bba9e (patch)
treec85ce9481e35c89cc0828f1d9be2d230c3e78e04
parent13d9ab50e96d4a22f26ba9e4b67e7f2088b51edf (diff)
parent0d0dc00822243bd00da25b5b4823eadb50fe02db (diff)
downloadbuildstream-1d4703b5dbed7bf2f4af0c11c49dfd43f07bba9e.tar.gz
Merge branch 'juerg/prepare-fork' into 'master'
Replace is_fork_allowed() with prepare_fork() See merge request BuildStream/buildstream!1641
-rw-r--r--src/buildstream/_artifactcache.py4
-rw-r--r--src/buildstream/_basecache.py13
-rw-r--r--src/buildstream/_cas/cascache.py7
-rw-r--r--src/buildstream/_context.py19
-rw-r--r--src/buildstream/_remote.py45
-rw-r--r--src/buildstream/_scheduler/scheduler.py9
-rw-r--r--src/buildstream/_sourcecache.py4
-rw-r--r--src/buildstream/plugins/sources/workspace.py3
-rw-r--r--src/buildstream/source.py2
9 files changed, 45 insertions, 61 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py
index b4d4efe00..d9112cd58 100644
--- a/src/buildstream/_artifactcache.py
+++ b/src/buildstream/_artifactcache.py
@@ -42,6 +42,10 @@ class ArtifactRemote(BaseRemote):
super().__init__(*args, **kwargs)
self.artifact_service = None
+ def close(self):
+ self.artifact_service = None
+ super().close()
+
# _configure_protocols():
#
# Configure the protocols used by this remote as part of the
diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py
index df50bfb62..c2772d02c 100644
--- a/src/buildstream/_basecache.py
+++ b/src/buildstream/_basecache.py
@@ -74,16 +74,23 @@ class BaseCache():
return True
return False
- # release_resources():
+ # close_grpc_channels():
#
- # Release resources used by BaseCache.
+ # Close open gRPC channels.
#
- def release_resources(self):
+ def close_grpc_channels(self):
# Close all remotes and their gRPC channels
for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()):
for remote in project_remotes:
remote.close()
+ # release_resources():
+ #
+ # Release resources used by BaseCache.
+ #
+ def release_resources(self):
+ self.close_grpc_channels()
+
# specs_from_config_node()
#
# Parses the configuration of remote artifact caches from a config block.
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index b6893503f..83b8e8539 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -185,13 +185,14 @@ class CASCache():
def has_open_grpc_channels(self):
return bool(self._casd_channel)
- # close_channel():
+ # close_grpc_channels():
#
# Close the casd channel if it exists
#
- def close_channel(self):
+ def close_grpc_channels(self):
if self._casd_channel:
self._local_cas = None
+ self._casd_cas = None
self._casd_channel.close()
self._casd_channel = None
@@ -204,7 +205,7 @@ class CASCache():
self._cache_usage_monitor.release_resources()
if self._casd_process:
- self.close_channel()
+ self.close_grpc_channels()
self._terminate_casd_process(messenger)
shutil.rmtree(self._casd_socket_tempdir)
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 7ff993166..879555089 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -526,19 +526,16 @@ class Context():
log_level=log_level)
return self._cascache
- # is_fork_allowed():
+ # prepare_fork():
#
- # Return whether fork without exec is allowed. This is a safeguard against
+ # Prepare this process for fork without exec. This is a safeguard against
# fork issues with multiple threads and gRPC connections.
#
- def is_fork_allowed(self):
- # Do not allow fork if there are background threads.
- if not utils._is_single_threaded():
- return False
-
- # Do not allow fork if there are open gRPC channels.
+ def prepare_fork(self):
+ # gRPC channels must be closed before fork.
for cache in [self._cascache, self._artifactcache, self._sourcecache]:
- if cache and cache.has_open_grpc_channels():
- return False
+ if cache:
+ cache.close_grpc_channels()
- return True
+ # Do not allow fork if there are background threads.
+ return utils._is_single_threaded()
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index 0c0fed44d..8527ca4cc 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -15,16 +15,12 @@
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
-import multiprocessing
import os
-import signal
from collections import namedtuple
from urllib.parse import urlparse
import grpc
-from . import _signals
-from . import utils
from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError
from .types import FastEnum
@@ -192,6 +188,8 @@ class BaseRemote():
self.channel.close()
self.channel = None
+ self._initialized = False
+
# _configure_protocols():
#
# An abstract method to configure remote-specific protocols. This
@@ -217,41 +215,14 @@ class BaseRemote():
# RemoteError: If the grpc call fails.
#
def check(self):
- queue = multiprocessing.Queue()
-
- def __check_remote():
- try:
- self.init()
- queue.put(self._check())
-
- except grpc.RpcError as e:
- # str(e) is too verbose for errors reported to the user
- queue.put(e.details())
-
- except Exception as e: # pylint: disable=broad-except
- # Whatever happens, we need to return it to the calling process
- #
- queue.put(str(e))
-
- process = multiprocessing.Process(target=__check_remote)
-
try:
- # Keep SIGINT blocked in the child process
- with _signals.blocked([signal.SIGINT], ignore=False):
- process.start()
-
- error = queue.get()
- process.join()
- except KeyboardInterrupt:
- utils._kill_process_tree(process.pid)
- raise
+ self.init()
+ self._check()
+ except grpc.RpcError as e:
+ # str(e) is too verbose for errors reported to the user
+ raise RemoteError(e.details())
finally:
- # Should not be necessary, but let's avoid keeping them
- # alive too long
- queue.close()
-
- if error:
- raise RemoteError(error)
+ self.close()
# _check():
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6133cbfd7..d3faa2a8e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -165,8 +165,6 @@ class Scheduler():
#
def run(self, queues, casd_process):
- assert self.context.is_fork_allowed()
-
# Hold on to the queues to process
self.queues = queues
@@ -408,6 +406,13 @@ class Scheduler():
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
+ # Make sure fork is allowed before starting jobs
+ if not self.context.prepare_fork():
+ message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self.terminate_jobs()
+ return
+
# Start the jobs
#
for job in ready:
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index b987d9b8d..cdbe5b9cf 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -36,6 +36,10 @@ class SourceRemote(BaseRemote):
super().__init__(*args, **kwargs)
self.source_service = None
+ def close(self):
+ self.source_service = None
+ super().close()
+
def _configure_protocols(self):
# set up source service
self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
diff --git a/src/buildstream/plugins/sources/workspace.py b/src/buildstream/plugins/sources/workspace.py
index 0e7797fe8..8dbcc6218 100644
--- a/src/buildstream/plugins/sources/workspace.py
+++ b/src/buildstream/plugins/sources/workspace.py
@@ -103,9 +103,6 @@ class WorkspaceSource(Source):
"Failed to stage source: files clash with existing directory",
reason='ensure-stage-dir-fail')
self.__digest = self.__cas_dir._get_digest().hash
- # now close down grpc channels
- cas.close_channel()
- assert not cas.has_open_grpc_channels()
return (self.path, self.__digest)
def init_workspace(self, directory: Directory) -> None:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f4b682616..1fb318b52 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -1085,8 +1085,6 @@ class Source(Plugin):
sourcecache = self._get_context().sourcecache
if self.get_kind() == 'workspace' and not sourcecache.contains(self):
sourcecache.commit(self, [])
- sourcecache.cas.close_channel()
- assert not sourcecache.cas.has_open_grpc_channels()
@property
def _key(self):