diff options
-rw-r--r-- | src/buildstream/_cas/cascache.py | 67 | ||||
-rw-r--r-- | src/buildstream/_cas/casdprocessmanager.py | 72 | ||||
-rw-r--r-- | src/buildstream/_context.py | 14 | ||||
-rw-r--r-- | src/buildstream/_messenger.py | 58 | ||||
-rw-r--r-- | src/buildstream/_remote.py | 56 | ||||
-rw-r--r-- | src/buildstream/_scheduler/_multiprocessing.py | 79 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 151 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 69 | ||||
-rw-r--r-- | src/buildstream/downloadablefilesource.py | 5 | ||||
-rw-r--r-- | src/buildstream/source.py | 8 | ||||
-rw-r--r-- | src/buildstream/testing/_fixtures.py | 1 | ||||
-rw-r--r-- | tests/internals/cascache.py | 9 |
12 files changed, 208 insertions, 381 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index 7936121ea..b4e206370 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -22,11 +22,9 @@ import itertools import os import stat import contextlib -import ctypes -import multiprocessing -import signal import time from typing import Optional, List +import threading import grpc @@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from .._protos.build.buildgrid import local_cas_pb2 -from .. import _signals, utils +from .. import utils from ..types import FastEnum, SourceRef from .._exceptions import CASCacheError @@ -93,6 +91,7 @@ class CASCache: self._casd_channel = self._casd_process_manager.create_channel() self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel) + self._cache_usage_monitor.start() # get_cas(): # @@ -132,7 +131,8 @@ class CASCache: # def release_resources(self, messenger=None): if self._cache_usage_monitor: - self._cache_usage_monitor.release_resources() + self._cache_usage_monitor.stop() + self._cache_usage_monitor.join() if self._casd_process_manager: self.close_grpc_channels() @@ -731,65 +731,42 @@ class _CASCacheUsage: # This manages the subprocess that tracks cache usage information via # buildbox-casd. # -class _CASCacheUsageMonitor: +class _CASCacheUsageMonitor(threading.Thread): def __init__(self, connection): + super().__init__() self._connection = connection - - # Shared memory (64-bit signed integer) for current disk usage and quota - self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1) - self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1) - - # multiprocessing.Process will fork without exec on Unix. - # This can't be allowed with background threads or open gRPC channels. - assert utils._is_single_threaded() and connection.is_closed() - - # Block SIGINT, we don't want to kill the process when we interrupt the frontend - # and this process if very lightweight. - with _signals.blocked([signal.SIGINT], ignore=False): - self._subprocess = multiprocessing.Process(target=self._subprocess_run) - self._subprocess.start() + self._disk_usage = None + self._disk_quota = None + self._should_stop = False def get_cache_usage(self): - disk_usage = self._disk_usage.value - disk_quota = self._disk_quota.value - - if disk_usage < 0: - # Disk usage still unknown - disk_usage = None - - if disk_quota <= 0: - # No disk quota - disk_quota = None - - return _CASCacheUsage(disk_usage, disk_quota) - - def release_resources(self): - # Simply terminate the subprocess, no cleanup required in the subprocess - self._subprocess.terminate() + # FIXME: remove this abstraction + return _CASCacheUsage(self._disk_usage, self._disk_quota) - def _subprocess_run(self): - # Reset SIGTERM in subprocess to default as no cleanup is necessary - signal.signal(signal.SIGTERM, signal.SIG_DFL) + def stop(self): + self._should_stop = True - disk_usage = self._disk_usage - disk_quota = self._disk_quota + def run(self): local_cas = self._connection.get_local_cas() - while True: + while not self._should_stop: try: # Ask buildbox-casd for current value request = local_cas_pb2.GetLocalDiskUsageRequest() response = local_cas.GetLocalDiskUsage(request) # Update values in shared memory - disk_usage.value = response.size_bytes - disk_quota.value = response.quota_bytes + self._disk_usage = response.size_bytes + self._disk_quota = response.quota_bytes except grpc.RpcError: # Terminate loop when buildbox-casd becomes unavailable break # Sleep until next refresh - time.sleep(_CACHE_USAGE_REFRESH) + for _ in range(_CACHE_USAGE_REFRESH * 10): + if self._should_stop: + break + time.sleep(0.1) def _grouper(iterable, n): diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py index 637c4e0b1..634f696ce 100644 --- a/src/buildstream/_cas/casdprocessmanager.py +++ b/src/buildstream/_cas/casdprocessmanager.py @@ -17,6 +17,7 @@ # import contextlib +import threading import os import random import shutil @@ -237,33 +238,42 @@ class CASDChannel: self._casd_cas = None self._local_cas = None self._casd_pid = casd_pid + self._shutdown_requested = False - def _establish_connection(self): - assert self._casd_channel is None + self.lock = threading.Lock() - while not os.path.exists(self._socket_path): - # casd is not ready yet, try again after a 10ms delay, - # but don't wait for more than specified timeout period - if time.time() > self._start_time + _CASD_TIMEOUT: - raise CASCacheError("Timed out waiting for buildbox-casd to become ready") + def _establish_connection(self): + with self.lock: + if self._casd_channel is not None: + return - # check that process is still alive - try: - proc = psutil.Process(self._casd_pid) - if proc.status() == psutil.STATUS_ZOMBIE: - proc.wait() + while not os.path.exists(self._socket_path): + # casd is not ready yet, try again after a 10ms delay, + # but don't wait for more than specified timeout period + if time.time() > self._start_time + _CASD_TIMEOUT: + raise CASCacheError("Timed out waiting for buildbox-casd to become ready") - if not proc.is_running(): + # check that process is still alive + try: + proc = psutil.Process(self._casd_pid) + if proc.status() == psutil.STATUS_ZOMBIE: + proc.wait() + + if not proc.is_running(): + if self._shutdown_requested: + return + raise CASCacheError("buildbox-casd process died before connection could be established") + except psutil.NoSuchProcess: + if self._shutdown_requested: + return raise CASCacheError("buildbox-casd process died before connection could be established") - except psutil.NoSuchProcess: - raise CASCacheError("buildbox-casd process died before connection could be established") - time.sleep(0.01) + time.sleep(0.01) - self._casd_channel = grpc.insecure_channel(self._connection_string) - self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) - self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) - self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) + self._casd_channel = grpc.insecure_channel(self._connection_string) + self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel) + self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel) + self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel) # get_cas(): # @@ -279,12 +289,12 @@ class CASDChannel: # Return LocalCAS stub for buildbox-casd channel. # def get_local_cas(self): - if self._casd_channel is None: + if self._local_cas is None: self._establish_connection() return self._local_cas def get_bytestream(self): - if self._casd_channel is None: + if self._bytestream is None: self._establish_connection() return self._bytestream @@ -300,10 +310,14 @@ class CASDChannel: # Close the casd channel. # def close(self): - if self.is_closed(): - return - self._local_cas = None - self._casd_cas = None - self._bytestream = None - self._casd_channel.close() - self._casd_channel = None + with self.lock: + self._shutdown_requested = True + + if self.is_closed(): + return + + self._local_cas = None + self._casd_cas = None + self._bytestream = None + self._casd_channel.close() + self._casd_channel = None diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py index c0e92b98e..ef68d9556 100644 --- a/src/buildstream/_context.py +++ b/src/buildstream/_context.py @@ -549,17 +549,3 @@ class Context: log_directory=self.logdir, ) return self._cascache - - # prepare_fork(): - # - # Prepare this process for fork without exec. This is a safeguard against - # fork issues with multiple threads and gRPC connections. - # - def prepare_fork(self): - # gRPC channels must be closed before fork. - for cache in [self._cascache, self._artifactcache, self._sourcecache]: - if cache: - cache.close_grpc_channels() - - # Do not allow fork if there are background threads. - return utils._is_single_threaded() diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 3a32a2467..222b05d17 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -19,10 +19,10 @@ import os import datetime +import threading from contextlib import contextmanager from . import _signals -from . import utils from ._exceptions import BstError from ._message import Message, MessageType @@ -48,15 +48,17 @@ class _TimeData: class Messenger: def __init__(self): - self._message_handler = None - self._silence_scope_depth = 0 - self._log_handle = None - self._log_filename = None self._state = None self._next_render = None # A Time object self._active_simple_tasks = 0 self._render_status_cb = None + self._locals = threading.local() + self._locals.message_handler = None + self._locals.log_handle = None + self._locals.log_filename = None + self._locals.silence_scope_depth = 0 + # set_message_handler() # # Sets the handler for any status messages propagated through @@ -70,7 +72,7 @@ class Messenger: # ) -> None # def set_message_handler(self, handler): - self._message_handler = handler + self._locals.message_handler = handler # set_state() # @@ -101,7 +103,7 @@ class Messenger: # (bool): Whether messages are currently being silenced # def _silent_messages(self): - return self._silence_scope_depth > 0 + return self._locals.silence_scope_depth > 0 # message(): # @@ -112,16 +114,15 @@ class Messenger: # message: A Message object # def message(self, message): - # If we are recording messages, dump a copy into the open log file. self._record_message(message) # Send it off to the log handler (can be the frontend, # or it can be the child task which will propagate # to the frontend) - assert self._message_handler + assert self._locals.message_handler - self._message_handler(message, is_silenced=self._silent_messages()) + self._locals.message_handler(message, is_silenced=self._silent_messages()) # silence() # @@ -141,12 +142,12 @@ class Messenger: yield return - self._silence_scope_depth += 1 + self._locals.silence_scope_depth += 1 try: yield finally: - assert self._silence_scope_depth > 0 - self._silence_scope_depth -= 1 + assert self._locals.silence_scope_depth > 0 + self._locals.silence_scope_depth -= 1 # timed_activity() # @@ -265,22 +266,21 @@ class Messenger: # @contextmanager def recorded_messages(self, filename, logdir): - # We dont allow recursing in this context manager, and # we also do not allow it in the main process. - assert self._log_handle is None - assert self._log_filename is None - assert not utils._is_main_process() + assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None + assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None # Create the fully qualified logfile in the log directory, # appending the pid and .log extension at the end. - self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid())) + self._locals.silence_scope_depth = 0 # Ensure the directory exists first - directory = os.path.dirname(self._log_filename) + directory = os.path.dirname(self._locals.log_filename) os.makedirs(directory, exist_ok=True) - with open(self._log_filename, "a") as logfile: + with open(self._locals.log_filename, "a") as logfile: # Write one last line to the log and flush it to disk def flush_log(): @@ -295,12 +295,12 @@ class Messenger: except RuntimeError: os.fsync(logfile.fileno()) - self._log_handle = logfile + self._locals.log_handle = logfile with _signals.terminator(flush_log): - yield self._log_filename + yield self._locals.log_filename - self._log_handle = None - self._log_filename = None + self._locals.log_handle = None + self._locals.log_filename = None # get_log_handle() # @@ -312,7 +312,7 @@ class Messenger: # (file): The active logging file handle, or None # def get_log_handle(self): - return self._log_handle + return self._locals.log_handle # get_log_filename() # @@ -324,7 +324,7 @@ class Messenger: # (str): The active logging filename, or None # def get_log_filename(self): - return self._log_filename + return self._locals.log_filename # _record_message() # @@ -335,7 +335,7 @@ class Messenger: # def _record_message(self, message): - if self._log_handle is None: + if self._locals.log_handle is None: return INDENT = " " @@ -372,8 +372,8 @@ class Messenger: ) # Write to the open log file - self._log_handle.write("{}\n".format(text)) - self._log_handle.flush() + self._locals.log_handle.write("{}\n".format(text)) + self._locals.log_handle.flush() # _render_status() # diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py index d8b8e68fe..6d52ff56a 100644 --- a/src/buildstream/_remote.py +++ b/src/buildstream/_remote.py @@ -16,6 +16,7 @@ # import os +import threading from collections import namedtuple from urllib.parse import urlparse @@ -146,41 +147,44 @@ class BaseRemote: self.push = spec.push self.url = spec.url + self._lock = threading.Lock() + # init(): # # Initialize the given remote. This function must be called before # any communication is performed, since such will otherwise fail. # def init(self): - if self._initialized: - return - - # Set up the communcation channel - url = urlparse(self.spec.url) - if url.scheme == "http": - port = url.port or 80 - self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) - elif url.scheme == "https": - port = url.port or 443 - try: - server_cert, client_key, client_cert = _read_files( - self.spec.server_cert, self.spec.client_key, self.spec.client_cert + with self._lock: + if self._initialized: + return + + # Set up the communcation channel + url = urlparse(self.spec.url) + if url.scheme == "http": + port = url.port or 80 + self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port)) + elif url.scheme == "https": + port = url.port or 443 + try: + server_cert, client_key, client_cert = _read_files( + self.spec.server_cert, self.spec.client_key, self.spec.client_cert + ) + except FileNotFoundError as e: + raise RemoteError("Could not read certificates: {}".format(e)) from e + self.server_cert = server_cert + self.client_key = client_key + self.client_cert = client_cert + credentials = grpc.ssl_channel_credentials( + root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert ) - except FileNotFoundError as e: - raise RemoteError("Could not read certificates: {}".format(e)) from e - self.server_cert = server_cert - self.client_key = client_key - self.client_cert = client_cert - credentials = grpc.ssl_channel_credentials( - root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert - ) - self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) - else: - raise RemoteError("Unsupported URL: {}".format(self.spec.url)) + self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials) + else: + raise RemoteError("Unsupported URL: {}".format(self.spec.url)) - self._configure_protocols() + self._configure_protocols() - self._initialized = True + self._initialized = True def __enter__(self): return self diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py deleted file mode 100644 index 4864e140c..000000000 --- a/src/buildstream/_scheduler/_multiprocessing.py +++ /dev/null @@ -1,79 +0,0 @@ -# -# Copyright (C) 2019 Bloomberg Finance LP -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. -# - -# TLDR: -# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process` -# -# -# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running. -# -# The main problem that affects us is that the parent and the child will share some file handlers. -# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received -# by the app so that the asyncio loop can treat them afterwards. -# -# This sharing means that when we send a signal to the child, the sighandler in the child will write -# it back to the parent sig_handler_fd, making the parent have to treat it too. -# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children, -# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to -# the children... -# -# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically -# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers. -# -# -# Relevant issues: -# - Asyncio: support fork (https://bugs.python.org/issue21998) -# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087) -# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489) -# -# - -import multiprocessing -import signal -import sys -from asyncio import set_event_loop_policy - - -# _AsyncioSafeForkAwareProcess() -# -# Process class that doesn't call waitpid on its own. -# This prevents conflicts with the asyncio child watcher. -# -# Also automatically close any running asyncio loop before calling -# the actual run target -# -class _AsyncioSafeForkAwareProcess(multiprocessing.Process): - # pylint: disable=attribute-defined-outside-init - def start(self): - self._popen = self._Popen(self) - self._sentinel = self._popen.sentinel - - def run(self): - signal.set_wakeup_fd(-1) - set_event_loop_policy(None) - - super().run() - - -if sys.platform != "win32": - # Set the default event loop policy to automatically close our asyncio loop in child processes - AsyncioSafeProcess = _AsyncioSafeForkAwareProcess - -else: - # Windows doesn't support ChildWatcher that way anyways, we'll need another - # implementation if we want it - AsyncioSafeProcess = multiprocessing.Process diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 7ea87dc62..a4ace4187 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -24,17 +24,13 @@ import asyncio import datetime import multiprocessing -import os -import signal -import sys import traceback # BuildStream toplevel imports from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum -from ... import _signals, utils -from .. import _multiprocessing +from ... import _signals # Return code values shutdown of job handling child processes @@ -130,7 +126,6 @@ class Job: self._scheduler = scheduler # The scheduler self._messenger = self._scheduler.context.messenger self._pipe_r = None # The read end of a pipe for message passing - self._process = None # The Process object self._listening = False # Whether the parent is currently listening self._suspended = False # Whether this job is currently suspended self._max_retries = max_retries # Maximum number of automatic retries @@ -143,6 +138,8 @@ class Job: self._message_element_key = None # The element key for messaging self._element = None # The Element() passed to the Job() constructor, if applicable + self._task = None # The task that is run + # set_name() # # Sets the name of this job @@ -157,11 +154,13 @@ class Job: assert not self._terminated, "Attempted to start process which was already terminated" + # FIXME: remove this, this is not necessary when using asyncio self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False) self._tries += 1 self._parent_start_listening() + # FIXME: remove the parent/child separation, it's not needed anymore. child_job = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, self._messenger, @@ -173,26 +172,18 @@ class Job: self._message_element_key, ) - self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],) - - # Block signals which are handled in the main process such that - # the child process does not inherit the parent's state, but the main - # process will be notified of any signal after we launch the child. - # - with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False): - with asyncio.get_child_watcher() as watcher: - self._process.start() - # Register the process to call `_parent_child_completed` once it is done - - # Close the write end of the pipe in the parent - pipe_w.close() + loop = asyncio.get_event_loop() - # Here we delay the call to the next loop tick. This is in order to be running - # in the main thread, as the callback itself must be thread safe. - def on_completion(pid, returncode): - asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode) + async def execute(): + try: + result = await loop.run_in_executor(None, child_job.child_action, pipe_w) + except asyncio.CancelledError: + result = _ReturnCode.TERMINATED + except Exception: # pylint: disable=broad-except + result = _ReturnCode.FAIL + await self._parent_child_completed(result) - watcher.add_child_handler(self._process.pid, on_completion) + self._task = loop.create_task(execute()) # terminate() # @@ -201,18 +192,14 @@ class Job: # This will send a SIGTERM signal to the Job process. # def terminate(self): - - # First resume the job if it's suspended - self.resume(silent=True) - self.message(MessageType.STATUS, "{} terminating".format(self.action_name)) # Make sure there is no garbage on the pipe self._parent_stop_listening() # Terminate the process using multiprocessing API pathway - if self._process: - self._process.terminate() + if self._task: + self._task.cancel() self._terminated = True @@ -226,51 +213,6 @@ class Job: def get_terminated(self): return self._terminated - # kill() - # - # Forcefully kill the process, and any children it might have. - # - def kill(self): - # Force kill - self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name)) - if self._process: - utils._kill_process_tree(self._process.pid) - - # suspend() - # - # Suspend this job. - # - def suspend(self): - if not self._suspended: - self.message(MessageType.STATUS, "{} suspending".format(self.action_name)) - - try: - # Use SIGTSTP so that child processes may handle and propagate - # it to processes they start that become session leaders. - os.kill(self._process.pid, signal.SIGTSTP) - - # For some reason we receive exactly one suspend event for - # every SIGTSTP we send to the child process, even though the - # child processes are setsid(). We keep a count of these so we - # can ignore them in our event loop suspend_event(). - self._scheduler.internal_stops += 1 - self._suspended = True - except ProcessLookupError: - # ignore, process has already exited - pass - - # resume() - # - # Resume this suspended job. - # - def resume(self, silent=False): - if self._suspended: - if not silent and not self._scheduler.terminated: - self.message(MessageType.STATUS, "{} resuming".format(self.action_name)) - - os.kill(self._process.pid, signal.SIGCONT) - self._suspended = False - # set_message_element_name() # # This is called by Job subclasses to set the plugin instance element @@ -397,10 +339,9 @@ class Job: # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() # # Args: - # pid (int): The PID of the child which completed # returncode (int): The return code of the child process # - def _parent_child_completed(self, pid, returncode): + async def _parent_child_completed(self, returncode): self._parent_shutdown() try: @@ -431,16 +372,16 @@ class Job: status = JobStatus.FAIL elif returncode == _ReturnCode.TERMINATED: if self._terminated: - self.message(MessageType.INFO, "Process was terminated") + self.message(MessageType.INFO, "Job terminated") else: - self.message(MessageType.ERROR, "Process was terminated unexpectedly") + self.message(MessageType.ERROR, "Job was terminated unexpectedly") status = JobStatus.FAIL elif returncode == _ReturnCode.KILLED: if self._terminated: - self.message(MessageType.INFO, "Process was killed") + self.message(MessageType.INFO, "Job was killed") else: - self.message(MessageType.ERROR, "Process was killed unexpectedly") + self.message(MessageType.ERROR, "Job was killed unexpectedly") status = JobStatus.FAIL else: @@ -451,7 +392,7 @@ class Job: # Force the deletion of the pipe and process objects to try and clean up FDs self._pipe_r.close() - self._pipe_r = self._process = None + self._pipe_r = self._task = None # _parent_process_envelope() # @@ -655,19 +596,6 @@ class ChildJob: # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC # def child_action(self, pipe_w): - - # This avoids some SIGTSTP signals from grandchildren - # getting propagated up to the master process - os.setsid() - - # First set back to the default signal handlers for the signals - # we handle, and then clear their blocked state. - # - signal_list = [signal.SIGTSTP, signal.SIGTERM] - for sig in signal_list: - signal.signal(sig, signal.SIG_DFL) - signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list) - # Assign the pipe we passed across the process boundaries # # Set the global message handler in this child @@ -687,16 +615,11 @@ class ChildJob: nonlocal starttime starttime += datetime.datetime.now() - stopped_time - # Graciously handle sigterms. - def handle_sigterm(): - self._child_shutdown(_ReturnCode.TERMINATED) - # Time, log and and run the action function # - with _signals.terminator(handle_sigterm), _signals.suspendable( - stop_time, resume_time - ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename: - + with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages( + self._logfile, self._logdir + ) as filename: self.message(MessageType.START, self.action_name, logfile=filename) try: @@ -707,7 +630,7 @@ class ChildJob: self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename) # Alert parent of skip by return code - self._child_shutdown(_ReturnCode.SKIPPED) + return _ReturnCode.SKIPPED except BstError as e: elapsed = datetime.datetime.now() - starttime retry_flag = e.temporary @@ -731,7 +654,7 @@ class ChildJob: # Set return code based on whether or not the error was temporary. # - self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL) + return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL except Exception: # pylint: disable=broad-except @@ -744,7 +667,7 @@ class ChildJob: self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename) # Unhandled exceptions should permenantly fail - self._child_shutdown(_ReturnCode.PERM_FAIL) + return _ReturnCode.PERM_FAIL else: # No exception occurred in the action @@ -757,7 +680,9 @@ class ChildJob: # Shutdown needs to stay outside of the above context manager, # make sure we dont try to handle SIGTERM while the process # is already busy in sys.exit() - self._child_shutdown(_ReturnCode.OK) + return _ReturnCode.OK + finally: + self._pipe_w.close() ####################################################### # Local Private Methods # @@ -809,18 +734,6 @@ class ChildJob: if result is not None: self._send_message(_MessageType.RESULT, result) - # _child_shutdown() - # - # Shuts down the child process by cleaning up and exiting the process - # - # Args: - # exit_code (_ReturnCode): The exit code to exit with - # - def _child_shutdown(self, exit_code): - self._pipe_w.close() - assert isinstance(exit_code, _ReturnCode) - sys.exit(exit_code.value) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 3e6bf1f92..7380f07e8 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -20,12 +20,14 @@ # Jürg Billeter <juerg.billeter@codethink.co.uk> # System imports +import functools import os import asyncio from itertools import chain import signal import datetime import sys +from concurrent.futures import ThreadPoolExecutor # Local imports from .resources import Resources @@ -34,9 +36,7 @@ from ..types import FastEnum from .._profile import Topics, PROFILER from .._message import Message, MessageType from ..plugin import Plugin - - -_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds +from .. import _signals # A decent return code for Scheduler.run() @@ -46,6 +46,23 @@ class SchedStatus(FastEnum): TERMINATED = 1 +def reset_signals_on_exit(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + orig_sigint = signal.getsignal(signal.SIGINT) + orig_sigterm = signal.getsignal(signal.SIGTERM) + orig_sigtstp = signal.getsignal(signal.SIGTSTP) + + try: + return func(*args, **kwargs) + finally: + signal.signal(signal.SIGINT, orig_sigint) + signal.signal(signal.SIGTERM, orig_sigterm) + signal.signal(signal.SIGTSTP, orig_sigtstp) + + return wrapper + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -79,7 +96,6 @@ class Scheduler: # These are shared with the Job, but should probably be removed or made private in some way. self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -113,6 +129,7 @@ class Scheduler: # elements have been processed by each queue or when # an error arises # + @reset_signals_on_exit def run(self, queues, casd_process_manager): # Hold on to the queues to process @@ -149,10 +166,14 @@ class Scheduler: # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): - # Run the queues - self._sched() - self.loop.run_forever() - self.loop.close() + # FIXME: this should be done in a cleaner way + with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None): + with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool: + self.loop.set_default_executor(pool) + # Run the queues + self._sched() + self.loop.run_forever() + self.loop.close() # Stop watching casd _watcher.remove_child_handler(self._casd_process.pid) @@ -348,13 +369,6 @@ class Scheduler: # If that happens, do another round. process_queues = any(q.dequeue_ready() for q in self.queues) - # Make sure fork is allowed before starting jobs - if not self.context.prepare_fork(): - message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self.context.messenger.message(message) - self.terminate() - return - # Start the jobs # for job in ready: @@ -412,9 +426,9 @@ class Scheduler: if not self.suspended: self._suspendtime = datetime.datetime.now() self.suspended = True - # Notify that we're suspended - for job in self._active_jobs: - job.suspend() + + for suspender in reversed(_signals.suspendable_stack): + suspender.suspend() # _resume_jobs() # @@ -422,8 +436,9 @@ class Scheduler: # def _resume_jobs(self): if self.suspended: - for job in self._active_jobs: - job.resume() + for suspender in _signals.suspendable_stack: + suspender.resume() + self.suspended = False # Notify that we're unsuspended self._state.offset_start_time(datetime.datetime.now() - self._suspendtime) @@ -455,12 +470,6 @@ class Scheduler: # A loop registered event callback for SIGTSTP # def _suspend_event(self): - - # Ignore the feedback signals from Job.suspend() - if self.internal_stops: - self.internal_stops -= 1 - return - # No need to care if jobs were suspended or not, we _only_ handle this # while we know jobs are not suspended. self._suspend_jobs() @@ -482,12 +491,8 @@ class Scheduler: self.loop.remove_signal_handler(signal.SIGTERM) def _terminate_jobs_real(self): - def kill_jobs(): - for job_ in self._active_jobs: - job_.kill() - - # Schedule all jobs to be killed if they have not exited after timeout - self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs) + for terminator in _signals.terminator_stack.copy(): + terminator() for job in self._active_jobs: job.terminate() diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index 7c2da1c02..b9ca91945 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -259,6 +259,11 @@ class DownloadableFileSource(Source): return self.__default_mirror_file + @classmethod + def _reset_url_opener(cls): + # Needed for tests, in order to cleanup the `netrc` configuration. + cls.__urlopener = None + def __get_urlopener(self): if not DownloadableFileSource.__urlopener: try: diff --git a/src/buildstream/source.py b/src/buildstream/source.py index f15d5a628..84315904f 100644 --- a/src/buildstream/source.py +++ b/src/buildstream/source.py @@ -990,19 +990,11 @@ class Source(Plugin): clean = node.strip_node_info() to_modify = node.strip_node_info() - current_ref = self.get_ref() # pylint: disable=assignment-from-no-return - # Set the ref regardless of whether it changed, the # TrackQueue() will want to update a specific node with # the ref, regardless of whether the original has changed. self.set_ref(new_ref, to_modify) - if current_ref == new_ref or not save: - # Note: We do not look for and propagate changes at this point - # which might result in desync depending if something changes about - # tracking in the future. For now, this is quite safe. - return False - actions = {} for k, v in clean.items(): if k not in to_modify: diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py index 5d1c1d227..520f68587 100644 --- a/src/buildstream/testing/_fixtures.py +++ b/src/buildstream/testing/_fixtures.py @@ -42,3 +42,4 @@ def thread_check(default_thread_number): @pytest.fixture(autouse=True) def reset_global_node_state(): node._reset_global_state() + DownloadableFileSource._reset_url_opener() diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py index 043531c24..e27e40974 100644 --- a/tests/internals/cascache.py +++ b/tests/internals/cascache.py @@ -3,6 +3,7 @@ import time from unittest.mock import MagicMock from buildstream._cas.cascache import CASCache +from buildstream._cas import casdprocessmanager from buildstream._message import MessageType from buildstream._messenger import Messenger @@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) @@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch): dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone") dummy_buildbox_casd.chmod(0o777) monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep) + # FIXME: this is a hack, we should instead have a socket be created nicely + # on the fake casd script. This whole test suite probably would + # need some cleanup + monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1) messenger = MagicMock(spec_set=Messenger) cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs"))) |