diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2019-10-15 09:54:57 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-10-15 09:54:57 +0000 |
commit | 1d4703b5dbed7bf2f4af0c11c49dfd43f07bba9e (patch) | |
tree | c85ce9481e35c89cc0828f1d9be2d230c3e78e04 | |
parent | 13d9ab50e96d4a22f26ba9e4b67e7f2088b51edf (diff) | |
parent | 0d0dc00822243bd00da25b5b4823eadb50fe02db (diff) | |
download | buildstream-1d4703b5dbed7bf2f4af0c11c49dfd43f07bba9e.tar.gz |
Merge branch 'juerg/prepare-fork' into 'master'
Replace is_fork_allowed() with prepare_fork()
See merge request BuildStream/buildstream!1641
-rw-r--r-- | src/buildstream/_artifactcache.py | 4 | ||||
-rw-r--r-- | src/buildstream/_basecache.py | 13 | ||||
-rw-r--r-- | src/buildstream/_cas/cascache.py | 7 | ||||
-rw-r--r-- | src/buildstream/_context.py | 19 | ||||
-rw-r--r-- | src/buildstream/_remote.py | 45 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 9 | ||||
-rw-r--r-- | src/buildstream/_sourcecache.py | 4 | ||||
-rw-r--r-- | src/buildstream/plugins/sources/workspace.py | 3 | ||||
-rw-r--r-- | src/buildstream/source.py | 2 |
9 files changed, 45 insertions, 61 deletions
diff --git a/src/buildstream/_artifactcache.py b/src/buildstream/_artifactcache.py index b4d4efe00..d9112cd58 100644 --- a/src/buildstream/_artifactcache.py +++ b/src/buildstream/_artifactcache.py @@ -42,6 +42,10 @@ class ArtifactRemote(BaseRemote): super().__init__(*args, **kwargs) self.artifact_service = None + def close(self): + self.artifact_service = None + super().close() + # _configure_protocols(): # # Configure the protocols used by this remote as part of the diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index df50bfb62..c2772d02c 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -74,16 +74,23 @@ class BaseCache(): return True return False - # release_resources(): + # close_grpc_channels(): # - # Release resources used by BaseCache. + # Close open gRPC channels. # - def release_resources(self): + def close_grpc_channels(self): # Close all remotes and their gRPC channels for project_remotes in chain(self._index_remotes.values(), self._storage_remotes.values()): for remote in project_remotes: remote.close() + # release_resources(): + # + # Release resources used by BaseCache. + # + def release_resources(self): + self.close_grpc_channels() + # specs_from_config_node() # # Parses the configuration of remote artifact caches from a config block. diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index b6893503f..83b8e8539 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -185,13 +185,14 @@ class CASCache(): def has_open_grpc_channels(self): return bool(self._casd_channel) - # close_channel(): + # close_grpc_channels(): # # Close the casd channel if it exists # - def close_channel(self): + def close_grpc_channels(self): if self._casd_channel: self._local_cas = None + self._casd_cas = None self._casd_channel.close() self._casd_channel = None @@ -204,7 +205,7 @@ class CASCache(): self._cache_usage_monitor.release_resources() if self._casd_process: - self.close_channel() + self.close_grpc_channels() self._terminate_casd_process(messenger) shutil.rmtree(self._casd_socket_tempdir) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 7ff993166..879555089 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -526,19 +526,16 @@ class Context(): log_level=log_level) return self._cascache - # is_fork_allowed(): + # prepare_fork(): # - # Return whether fork without exec is allowed. This is a safeguard against + # Prepare this process for fork without exec. This is a safeguard against # fork issues with multiple threads and gRPC connections. # - def is_fork_allowed(self): - # Do not allow fork if there are background threads. - if not utils._is_single_threaded(): - return False - - # Do not allow fork if there are open gRPC channels. + def prepare_fork(self): + # gRPC channels must be closed before fork. for cache in [self._cascache, self._artifactcache, self._sourcecache]: - if cache and cache.has_open_grpc_channels(): - return False + if cache: + cache.close_grpc_channels() - return True + # Do not allow fork if there are background threads. + return utils._is_single_threaded() diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index 0c0fed44d..8527ca4cc 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -15,16 +15,12 @@ # License along with this library. If not, see <http://www.gnu.org/licenses/>. # -import multiprocessing import os -import signal from collections import namedtuple from urllib.parse import urlparse import grpc -from . import _signals -from . import utils from ._exceptions import LoadError, LoadErrorReason, ImplError, RemoteError from .types import FastEnum @@ -192,6 +188,8 @@ class BaseRemote(): self.channel.close() self.channel = None + self._initialized = False + # _configure_protocols(): # # An abstract method to configure remote-specific protocols. This @@ -217,41 +215,14 @@ class BaseRemote(): # RemoteError: If the grpc call fails. # def check(self): - queue = multiprocessing.Queue() - - def __check_remote(): - try: - self.init() - queue.put(self._check()) - - except grpc.RpcError as e: - # str(e) is too verbose for errors reported to the user - queue.put(e.details()) - - except Exception as e: # pylint: disable=broad-except - # Whatever happens, we need to return it to the calling process - # - queue.put(str(e)) - - process = multiprocessing.Process(target=__check_remote) - try: - # Keep SIGINT blocked in the child process - with _signals.blocked([signal.SIGINT], ignore=False): - process.start() - - error = queue.get() - process.join() - except KeyboardInterrupt: - utils._kill_process_tree(process.pid) - raise + self.init() + self._check() + except grpc.RpcError as e: + # str(e) is too verbose for errors reported to the user + raise RemoteError(e.details()) finally: - # Should not be necessary, but let's avoid keeping them - # alive too long - queue.close() - - if error: - raise RemoteError(error) + self.close() # _check(): # diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 6133cbfd7..d3faa2a8e 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -165,8 +165,6 @@ class Scheduler(): # def run(self, queues, casd_process): - assert self.context.is_fork_allowed() - # Hold on to the queues to process self.queues = queues @@ -408,6 +406,13 @@ class Scheduler(): # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) + # Make sure fork is allowed before starting jobs + if not self.context.prepare_fork(): + message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") + self._notify(Notification(NotificationType.MESSAGE, message=message)) + self.terminate_jobs() + return + # Start the jobs # for job in ready: diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py index b987d9b8d..cdbe5b9cf 100644 --- a/src/buildstream/_sourcecache.py +++ b/src/buildstream/_sourcecache.py @@ -36,6 +36,10 @@ class SourceRemote(BaseRemote): super().__init__(*args, **kwargs) self.source_service = None + def close(self): + self.source_service = None + super().close() + def _configure_protocols(self): # set up source service self.source_service = source_pb2_grpc.SourceServiceStub(self.channel) diff --git a/src/buildstream/plugins/sources/workspace.py b/src/buildstream/plugins/sources/workspace.py index 0e7797fe8..8dbcc6218 100644 --- a/src/buildstream/plugins/sources/workspace.py +++ b/src/buildstream/plugins/sources/workspace.py @@ -103,9 +103,6 @@ class WorkspaceSource(Source): "Failed to stage source: files clash with existing directory", reason='ensure-stage-dir-fail') self.__digest = self.__cas_dir._get_digest().hash - # now close down grpc channels - cas.close_channel() - assert not cas.has_open_grpc_channels() return (self.path, self.__digest) def init_workspace(self, directory: Directory) -> None: diff --git a/src/buildstream/source.py b/src/buildstream/source.py index f4b682616..1fb318b52 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -1085,8 +1085,6 @@ class Source(Plugin): sourcecache = self._get_context().sourcecache if self.get_kind() == 'workspace' and not sourcecache.contains(self): sourcecache.commit(self, []) - sourcecache.cas.close_channel() - assert not sourcecache.cas.has_open_grpc_channels() @property def _key(self): |