From 0139e4739f9c9fc5a17e5ca04305fe1187dee00b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Tue, 3 Sep 2019 09:05:35 +0200 Subject: requirements: Update grpcio to 1.23.0 I see aborts and hangs related to gRPC locks with grpcio 1.17.1. Updating grpcio to 1.23.0 fixes these issues. --- requirements/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 0f1f5599f..370316a68 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,5 +1,5 @@ Click==7.0 -grpcio==1.17.1 +grpcio==1.23.0 Jinja2==2.10 pluginbase==0.7 protobuf==3.6.1 -- cgit v1.2.1 From 088705c4c429cc467f8a89ea39958a56a795b5a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 08:40:39 +0200 Subject: cascache.py: Close gRPC channel in release_resources() --- src/buildstream/_cas/cascache.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 03d7e5dda..af69e3d68 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -150,6 +150,11 @@ class CASCache(): # def release_resources(self, messenger=None): if self._casd_process: + if self._casd_channel: + self._local_cas = None + self._casd_channel.close() + self._casd_channel = None + self._casd_process.terminate() try: # Don't print anything if buildbox-casd terminates quickly -- cgit v1.2.1 From 733cbfafffd79873f401a198d2986fc82773f57e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 08:43:08 +0200 Subject: casremote.py: Add close() method to close gRPC channel Also support use as context manager. --- src/buildstream/_cas/casremote.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index 9cc1a5488..05b57d3d4 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -183,6 +183,18 @@ class CASRemote(): self._initialized = True + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False + + def close(self): + if self.channel: + self.channel.close() + self.channel = None + # check_remote # # Used when checking whether remote_specs work in the buildstream main -- cgit v1.2.1 From 418c304de0ecebe588d5c80efac748b33c69cfbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 09:03:37 +0200 Subject: _basecache.py: Add release_resources() method --- src/buildstream/_basecache.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 46de29f7b..431a7304e 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -59,6 +59,16 @@ class BaseCache(): self._has_fetch_remotes = False self._has_push_remotes = False + # release_resources(): + # + # Release resources used by BaseCache. + # + def release_resources(self): + # Close all remotes and their gRPC channels + for project_remotes in self._remotes.values(): + for remote in project_remotes: + remote.close() + # specs_from_config_node() # # Parses the configuration of remote artifact caches from a config block. -- cgit v1.2.1 From 6c1f5b99ea263c226f554c24ccb428cd8a664340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 09:04:43 +0200 Subject: _context.py: Release artifactcache and sourcecache resources --- src/buildstream/_context.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 0d250eb56..9f6fdaf01 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -182,6 +182,12 @@ class Context(): # Called when exiting the with-statement context. # def __exit__(self, exc_type, exc_value, traceback): + if self._artifactcache: + self._artifactcache.release_resources() + + if self._sourcecache: + self._sourcecache.release_resources() + if self._cascache: self._cascache.release_resources(self.messenger) -- cgit v1.2.1 From 47b4cee0d474132e8863d1d917f5e961b24bb0e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 08:48:57 +0200 Subject: _sandboxremote.py: Use context manager for gRPC channels --- src/buildstream/sandbox/_sandboxremote.py | 58 ++++++++++++++++--------------- 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 4308d662b..b0c40a01c 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -351,19 +351,6 @@ class SandboxRemote(Sandbox): input_root_digest=input_root_digest) action_digest = utils._message_digest(action.SerializeToString()) - # Next, try to create a communication channel to the BuildGrid server. - url = urlparse(self.exec_url) - if not url.port: - raise SandboxError("You must supply a protocol and port number in the execution-service url, " - "for example: http://buildservice:50051.") - if url.scheme == 'http': - channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) - elif url.scheme == 'https': - channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) - else: - raise SandboxError("Remote execution currently only supports the 'http' protocol " - "and '{}' was supplied.".format(url.scheme)) - # check action cache download and download if there action_result = self._check_action_cache(action_digest) @@ -407,9 +394,23 @@ class SandboxRemote(Sandbox): except grpc.RpcError as e: raise SandboxError("Failed to push action to remote: {}".format(e)) + # Next, try to create a communication channel to the BuildGrid server. + url = urlparse(self.exec_url) + if not url.port: + raise SandboxError("You must supply a protocol and port number in the execution-service url, " + "for example: http://buildservice:50051.") + if url.scheme == 'http': + channel = grpc.insecure_channel('{}:{}'.format(url.hostname, url.port)) + elif url.scheme == 'https': + channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.exec_credentials) + else: + raise SandboxError("Remote execution currently only supports the 'http' protocol " + "and '{}' was supplied.".format(url.scheme)) + # Now request to execute the action - operation = self.run_remote_command(channel, action_digest) - action_result = self._extract_action_result(operation) + with channel: + operation = self.run_remote_command(channel, action_digest) + action_result = self._extract_action_result(operation) # Get output of build self.process_job_output(action_result.output_directories, action_result.output_files, @@ -445,20 +446,21 @@ class SandboxRemote(Sandbox): elif url.scheme == 'https': channel = grpc.secure_channel('{}:{}'.format(url.hostname, url.port), self.action_credentials) - request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, - action_digest=action_digest) - stub = remote_execution_pb2_grpc.ActionCacheStub(channel) - try: - result = stub.GetActionResult(request) - except grpc.RpcError as e: - if e.code() != grpc.StatusCode.NOT_FOUND: - raise SandboxError("Failed to query action cache: {} ({})" - .format(e.code(), e.details())) + with channel: + request = remote_execution_pb2.GetActionResultRequest(instance_name=self.action_instance, + action_digest=action_digest) + stub = remote_execution_pb2_grpc.ActionCacheStub(channel) + try: + result = stub.GetActionResult(request) + except grpc.RpcError as e: + if e.code() != grpc.StatusCode.NOT_FOUND: + raise SandboxError("Failed to query action cache: {} ({})" + .format(e.code(), e.details())) + else: + return None else: - return None - else: - self.info("Action result found in action cache") - return result + self.info("Action result found in action cache") + return result def _create_command(self, command, working_directory, environment): # Creates a command proto -- cgit v1.2.1 From 801f26de8de39c6e888fb0f3fd2eb4a554a29098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 08:55:51 +0200 Subject: _sandboxremote.py: Use context manager for CASRemote instances --- src/buildstream/sandbox/_sandboxremote.py | 88 ++++++++++++++++--------------- 1 file changed, 45 insertions(+), 43 deletions(-) diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index b0c40a01c..e9e145d1f 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -281,10 +281,11 @@ class SandboxRemote(Sandbox): context = self._get_context() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) - # Now do a pull to ensure we have the full directory structure. - dir_digest = cascache.pull_tree(casremote, tree_digest) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + # Now do a pull to ensure we have the full directory structure. + dir_digest = cascache.pull_tree(casremote, tree_digest) + if dir_digest is None or not dir_digest.hash or not dir_digest.size_bytes: raise SandboxError("Output directory structure pulling from remote failed.") @@ -300,7 +301,6 @@ class SandboxRemote(Sandbox): project = self._get_project() cascache = context.get_cascache() artifactcache = context.artifactcache - casremote = CASRemote(self.storage_remote_spec, cascache) # Fetch the file blobs if needed if self._output_files_required or artifactcache.has_push_remotes(): @@ -319,7 +319,9 @@ class SandboxRemote(Sandbox): # artifact servers. blobs_to_fetch = artifactcache.find_missing_blobs(project, local_missing_blobs) - remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + remote_missing_blobs = cascache.fetch_blobs(casremote, blobs_to_fetch) + if remote_missing_blobs: raise SandboxError("{} output files are missing on the CAS server" .format(len(remote_missing_blobs))) @@ -355,44 +357,44 @@ class SandboxRemote(Sandbox): action_result = self._check_action_cache(action_digest) if not action_result: - casremote = CASRemote(self.storage_remote_spec, cascache) - try: - casremote.init() - except grpc.RpcError as e: - raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" - .format(self.storage_url, e)) from e - - # Determine blobs missing on remote - try: - missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) - except grpc.RpcError as e: - raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e - - # Check if any blobs are also missing locally (partial artifact) - # and pull them from the artifact cache. - try: - local_missing_blobs = cascache.local_missing_blobs(missing_blobs) - if local_missing_blobs: - artifactcache.fetch_missing_blobs(project, local_missing_blobs) - except (grpc.RpcError, BstError) as e: - raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e - - # Now, push the missing blobs to the remote. - try: - cascache.send_blobs(casremote, missing_blobs) - except grpc.RpcError as e: - raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e - - # Push command and action - try: - casremote.push_message(command_proto) - except grpc.RpcError as e: - raise SandboxError("Failed to push command to remote: {}".format(e)) - - try: - casremote.push_message(action) - except grpc.RpcError as e: - raise SandboxError("Failed to push action to remote: {}".format(e)) + with CASRemote(self.storage_remote_spec, cascache) as casremote: + try: + casremote.init() + except grpc.RpcError as e: + raise SandboxError("Failed to contact remote execution CAS endpoint at {}: {}" + .format(self.storage_url, e)) from e + + # Determine blobs missing on remote + try: + missing_blobs = cascache.remote_missing_blobs_for_directory(casremote, input_root_digest) + except grpc.RpcError as e: + raise SandboxError("Failed to determine missing blobs: {}".format(e)) from e + + # Check if any blobs are also missing locally (partial artifact) + # and pull them from the artifact cache. + try: + local_missing_blobs = cascache.local_missing_blobs(missing_blobs) + if local_missing_blobs: + artifactcache.fetch_missing_blobs(project, local_missing_blobs) + except (grpc.RpcError, BstError) as e: + raise SandboxError("Failed to pull missing blobs from artifact cache: {}".format(e)) from e + + # Now, push the missing blobs to the remote. + try: + cascache.send_blobs(casremote, missing_blobs) + except grpc.RpcError as e: + raise SandboxError("Failed to push source directory to remote: {}".format(e)) from e + + # Push command and action + try: + casremote.push_message(command_proto) + except grpc.RpcError as e: + raise SandboxError("Failed to push command to remote: {}".format(e)) + + try: + casremote.push_message(action) + except grpc.RpcError as e: + raise SandboxError("Failed to push action to remote: {}".format(e)) # Next, try to create a communication channel to the BuildGrid server. url = urlparse(self.exec_url) -- cgit v1.2.1 From 429a9c0f597305699a6c05856784a06f8983bd2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Tue, 3 Sep 2019 06:18:30 +0200 Subject: tests/artifactcache: Use context manager for gRPC channels --- tests/artifactcache/artifactservice.py | 100 ++++++++++++++++----------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/tests/artifactcache/artifactservice.py b/tests/artifactcache/artifactservice.py index b3bc7c218..00d14b45d 100644 --- a/tests/artifactcache/artifactservice.py +++ b/tests/artifactcache/artifactservice.py @@ -38,19 +38,19 @@ def test_artifact_get_not_found(tmpdir): with create_artifact_share(sharedir) as share: # set up artifact service stub url = urlparse(share.repo) - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) + with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel: + artifact_stub = ArtifactServiceStub(channel) - # Run GetArtifact and check it throws a not found error - request = GetArtifactRequest() - request.cache_key = "@artifact/something/not_there" - try: - artifact_stub.GetArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.NOT_FOUND - assert e.details() == "Artifact proto not found" - else: - assert False + # Run GetArtifact and check it throws a not found error + request = GetArtifactRequest() + request.cache_key = "@artifact/something/not_there" + try: + artifact_stub.GetArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.NOT_FOUND + assert e.details() == "Artifact proto not found" + else: + assert False # Successfully getting the artifact @@ -70,42 +70,42 @@ def test_update_artifact(tmpdir, files): url = urlparse(share.repo) - channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) - artifact_stub = ArtifactServiceStub(channel) - - # initialise an artifact - artifact = Artifact() - artifact.version = 0 - artifact.build_success = True - artifact.strong_key = "abcdefghijklmnop" - artifact.files.hash = "hashymchashash" - artifact.files.size_bytes = 10 - - artifact.files.CopyFrom(digest) - - # Put it in the artifact share with an UpdateArtifactRequest - request = UpdateArtifactRequest() - request.artifact.CopyFrom(artifact) - request.cache_key = "a-cache-key" - - # should return the same artifact back - if files == "present": - response = artifact_stub.UpdateArtifact(request) + with grpc.insecure_channel("{}:{}".format(url.hostname, url.port)) as channel: + artifact_stub = ArtifactServiceStub(channel) + + # initialise an artifact + artifact = Artifact() + artifact.version = 0 + artifact.build_success = True + artifact.strong_key = "abcdefghijklmnop" + artifact.files.hash = "hashymchashash" + artifact.files.size_bytes = 10 + + artifact.files.CopyFrom(digest) + + # Put it in the artifact share with an UpdateArtifactRequest + request = UpdateArtifactRequest() + request.artifact.CopyFrom(artifact) + request.cache_key = "a-cache-key" + + # should return the same artifact back + if files == "present": + response = artifact_stub.UpdateArtifact(request) + assert response == artifact + else: + try: + artifact_stub.UpdateArtifact(request) + except grpc.RpcError as e: + assert e.code() == grpc.StatusCode.FAILED_PRECONDITION + if files == "absent": + assert e.details() == "Artifact files specified but no files found" + elif files == "invalid": + assert e.details() == "Artifact files specified but directory not found" + return + + # If we uploaded the artifact check GetArtifact + request = GetArtifactRequest() + request.cache_key = "a-cache-key" + + response = artifact_stub.GetArtifact(request) assert response == artifact - else: - try: - artifact_stub.UpdateArtifact(request) - except grpc.RpcError as e: - assert e.code() == grpc.StatusCode.FAILED_PRECONDITION - if files == "absent": - assert e.details() == "Artifact files specified but no files found" - elif files == "invalid": - assert e.details() == "Artifact files specified but directory not found" - return - - # If we uploaded the artifact check GetArtifact - request = GetArtifactRequest() - request.cache_key = "a-cache-key" - - response = artifact_stub.GetArtifact(request) - assert response == artifact -- cgit v1.2.1 From 655f11fcff2e7541aa8d4e263d128e6672a302e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Tue, 3 Sep 2019 06:18:35 +0200 Subject: tests/testutils/http_server.py: Close queue to stop background thread --- tests/testutils/http_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/testutils/http_server.py b/tests/testutils/http_server.py index 049769e72..6ecb7b5b3 100644 --- a/tests/testutils/http_server.py +++ b/tests/testutils/http_server.py @@ -106,6 +106,8 @@ class SimpleHttpServer(multiprocessing.Process): self.__stop.put(None) self.terminate() self.join() + self.__stop.close() + self.__stop.join_thread() def allow_anonymous(self, cwd): self.server.anonymous_dir = cwd -- cgit v1.2.1 From d6c667a936f250fd6497d553cc8d447b84314929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 11:47:53 +0200 Subject: utils.py: Add _is_single_threaded() method This will be used to safeguard against fork issues with multiple threads. --- src/buildstream/utils.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 998b77a71..872b5bd59 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -57,6 +57,10 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] # Main process pid _MAIN_PID = os.getpid() +# The number of threads in the main process at startup. +# This is 1 except for certain test environments (xdist/execnet). +_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 + class UtilError(BstError): """Raised by utility functions when system calls fail. @@ -1391,3 +1395,19 @@ def _get_compression(tar): else: # Assume just an unconventional name was provided, default to uncompressed return '' + + +# _is_single_threaded() +# +# Return whether the current Process is single-threaded. Don't count threads +# in the main process that were created by a test environment (xdist/execnet) +# before BuildStream was executed. +# +def _is_single_threaded(): + # Use psutil as threading.active_count() doesn't include gRPC threads. + process = psutil.Process() + num_threads = process.num_threads() + if process.pid == _MAIN_PID: + return num_threads == _INITIAL_NUM_THREADS_IN_MAIN_PROCESS + else: + return num_threads == 1 -- cgit v1.2.1 From df3000b05fd1b8f65289b3b0fbf476a19ef61869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:12:50 +0200 Subject: cascache.py: Add has_open_grpc_channels() method --- src/buildstream/_cas/cascache.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index af69e3d68..831229fe8 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -136,6 +136,14 @@ class CASCache(): if not (os.path.isdir(headdir) and os.path.isdir(objdir)): raise CASCacheError("CAS repository check failed for '{}'".format(self.casdir)) + # has_open_grpc_channels(): + # + # Return whether there are gRPC channel instances. This is used to safeguard + # against fork() with open gRPC channels. + # + def has_open_grpc_channels(self): + return bool(self._casd_channel) + # notify_fork_disabled(): # # Called by Context when fork() is disabled. This will enable communication -- cgit v1.2.1 From d13336addeb1c5726a3b84c1536dc29781b87df4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:15:38 +0200 Subject: _basecache.py: Add has_open_grpc_channels() method --- src/buildstream/_basecache.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/buildstream/_basecache.py b/src/buildstream/_basecache.py index 431a7304e..9ad6c1277 100644 --- a/src/buildstream/_basecache.py +++ b/src/buildstream/_basecache.py @@ -59,6 +59,18 @@ class BaseCache(): self._has_fetch_remotes = False self._has_push_remotes = False + # has_open_grpc_channels(): + # + # Return whether there are gRPC channel instances. This is used to safeguard + # against fork() with open gRPC channels. + # + def has_open_grpc_channels(self): + for project_remotes in self._remotes.values(): + for remote in project_remotes: + if remote.channel: + return True + return False + # release_resources(): # # Release resources used by BaseCache. -- cgit v1.2.1 From 7dd41ee3c635891aedfd25a11a9e030222c244c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 11:57:09 +0200 Subject: _context.py: Add is_fork_allowed() method --- src/buildstream/_context.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index 9f6fdaf01..fa59b0109 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -515,3 +515,20 @@ class Context(): self.fork_allowed = False cascache = self.get_cascache() cascache.notify_fork_disabled() + + # is_fork_allowed(): + # + # Return whether fork without exec is allowed. This is a safeguard against + # fork issues with multiple threads and gRPC connections. + # + def is_fork_allowed(self): + # Do not allow fork if there are background threads. + if not utils._is_single_threaded(): + return False + + # Do not allow fork if there are open gRPC channels. + for cache in [self._cascache, self._artifactcache, self._sourcecache]: + if cache and cache.has_open_grpc_channels(): + return False + + return True -- cgit v1.2.1 From 5545cb436a70a4b5e2e1f594fa57502a47439a57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 11:58:12 +0200 Subject: scheduler.py: Use Context.is_fork_allowed() --- src/buildstream/_scheduler/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index b29bc8841..37295b285 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -110,7 +110,7 @@ class Scheduler(): # def run(self, queues): - assert self.context.fork_allowed + assert self.context.is_fork_allowed() # Hold on to the queues to process self.queues = queues -- cgit v1.2.1 From 312f7081bb5ec48b8405c0256429c2e153138185 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:18:54 +0200 Subject: _stream.py: Remove disable_fork() calls The fork safeguard is now handled by Context.is_fork_allowed(). --- src/buildstream/_stream.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 9ff93fdc3..f0e891dcf 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -223,8 +223,6 @@ class Stream(): else: buildtree = True - self._context.disable_fork() - return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree) @@ -551,8 +549,6 @@ class Stream(): self._enqueue_plan(uncached_elts) self._run() - self._context.disable_fork() - # Stage deps into a temporary sandbox first if isinstance(target, ArtifactElement): try: @@ -623,7 +619,6 @@ class Stream(): load_refs=True) if self._artifacts.has_fetch_remotes(): - self._context.disable_fork() self._pipeline.check_remotes(target_objects) # XXX: We need to set the name of an ArtifactElement to its ref in order -- cgit v1.2.1 From ab56feac81b780f4e35f3c7c96efaa73007a59b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:22:51 +0200 Subject: _context.py: Remove unused disable_fork() method --- src/buildstream/_context.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index fa59b0109..ecdf6f97b 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -150,8 +150,6 @@ class Context(): # Whether file contents are required for all artifacts in the local cache self.require_artifact_files = True - self.fork_allowed = True - # Whether elements must be rebuilt when their dependencies have changed self._strict_build_plan = None @@ -506,16 +504,6 @@ class Context(): self._cascache = CASCache(self.cachedir, cache_quota=self.config_cache_quota) return self._cascache - # disable_fork(): - # - # This will prevent the scheduler from running but will allow communication - # with casd in the main process. - # - def disable_fork(self): - self.fork_allowed = False - cascache = self.get_cascache() - cascache.notify_fork_disabled() - # is_fork_allowed(): # # Return whether fork without exec is allowed. This is a safeguard against -- cgit v1.2.1 From 2ce01dafc17a8ead15f95a3ae11954d8b7839cc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:23:34 +0200 Subject: casserver.py: Remove notify_fork_disabled() call The fork safeguard is now handled by Context.is_fork_allowed(). --- src/buildstream/_cas/casserver.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py index 1835d118e..5a4c2b7ac 100644 --- a/src/buildstream/_cas/casserver.py +++ b/src/buildstream/_cas/casserver.py @@ -60,10 +60,6 @@ def create_server(repo, *, enable_push, quota): cas = CASCache(os.path.abspath(repo), cache_quota=quota, protect_session_blobs=False) try: - # Allow gRPC communication in main process as bst-artifact-server - # doesn't use forked subprocesses. - cas.notify_fork_disabled() - artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs') sourcedir = os.path.join(os.path.abspath(repo), 'source_protos') -- cgit v1.2.1 From 0963264d0021a9a0121ff769748814acbf1b8ff7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 12:25:01 +0200 Subject: cascache.py: Remove fork_disabled mechanism The fork safeguard is now handled by Context.is_fork_allowed(). --- src/buildstream/_cas/cascache.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 831229fe8..21c9f0ad6 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -86,7 +86,6 @@ class CASCache(): self._casd_channel = None self._local_cas = None - self._fork_disabled = False def __getstate__(self): state = self.__dict__.copy() @@ -102,9 +101,6 @@ class CASCache(): assert self._casd_process, "CASCache was instantiated without buildbox-casd" if not self._local_cas: - # gRPC doesn't support fork without exec, which is used in the main process. - assert self._fork_disabled or not utils._is_main_process() - self._casd_channel = grpc.insecure_channel('unix:' + self._casd_socket_path) self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) @@ -144,14 +140,6 @@ class CASCache(): def has_open_grpc_channels(self): return bool(self._casd_channel) - # notify_fork_disabled(): - # - # Called by Context when fork() is disabled. This will enable communication - # with casd via gRPC in the main process. - # - def notify_fork_disabled(self): - self._fork_disabled = True - # release_resources(): # # Release resources used by CASCache. -- cgit v1.2.1 From b4823df1c25ad5afd8449594b3b15b5676d6b9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 11:44:12 +0200 Subject: testing/_forked.py: Update _MAIN_PID for tests running in subprocesses This reduces the difference between regular execution and the test environment. --- src/buildstream/testing/_forked.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/buildstream/testing/_forked.py b/src/buildstream/testing/_forked.py index af5e9c070..164906b0c 100644 --- a/src/buildstream/testing/_forked.py +++ b/src/buildstream/testing/_forked.py @@ -28,6 +28,8 @@ import pytest # XXX Using pytest private internals here from _pytest import runner +from buildstream import utils + EXITSTATUS_TESTEXIT = 4 @@ -48,6 +50,10 @@ def serialize_report(rep): def forked_run_report(item): def runforked(): + # This process is now the main BuildStream process + # for the duration of this test. + utils._MAIN_PID = os.getpid() + try: reports = runner.runtestprotocol(item, log=False) except KeyboardInterrupt: -- cgit v1.2.1 From 3221f3511cbfe6a748b542627801acf8d0fcb379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=BCrg=20Billeter?= Date: Mon, 2 Sep 2019 09:47:05 +0200 Subject: tests: Catch tests that don't shut down background threads --- src/buildstream/testing/_fixtures.py | 31 ++++++++++++++++++++++++ src/buildstream/testing/_sourcetests/conftest.py | 17 +++++++++++++ tests/conftest.py | 1 + 3 files changed, 49 insertions(+) create mode 100644 src/buildstream/testing/_fixtures.py create mode 100644 src/buildstream/testing/_sourcetests/conftest.py diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py new file mode 100644 index 000000000..862cebe87 --- /dev/null +++ b/src/buildstream/testing/_fixtures.py @@ -0,0 +1,31 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +import psutil +import pytest + +from buildstream import utils + +# Catch tests that don't shut down background threads, which could then lead +# to other tests hanging when BuildStream uses fork(). +@pytest.fixture(autouse=True) +def thread_check(): + # xdist/execnet has its own helper thread. + # Ignore that for `utils._is_single_threaded` checks. + utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads() + + yield + assert utils._is_single_threaded() diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py new file mode 100644 index 000000000..f16c1e9ad --- /dev/null +++ b/src/buildstream/testing/_sourcetests/conftest.py @@ -0,0 +1,17 @@ +# +# Copyright (C) 2019 Bloomberg Finance LP +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see . + +from .._fixtures import thread_check # pylint: disable=unused-import diff --git a/tests/conftest.py b/tests/conftest.py index 7728fb5c8..0e39943d3 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,7 @@ import os import pytest from buildstream.testing import register_repo_kind, sourcetests_collection_hook +from buildstream.testing._fixtures import thread_check # pylint: disable=unused-import from buildstream.testing._forked import forked_run_report from buildstream.testing.integration import integration_cache # pylint: disable=unused-import -- cgit v1.2.1