summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-03 12:57:06 +0000
committerBenjamin Schubert <contact@benschubert.me>2020-07-07 17:59:44 +0100
commitad9073aac71783eefa53eccec47aba6f3b76367e (patch)
tree3c592e1bfb3a2777597bc336b556ae0be09d4deb
parent4c03165bae69a027bd81bffe5fb33e03caa64cdf (diff)
downloadbuildstream-ad9073aac71783eefa53eccec47aba6f3b76367e.tar.gz
scheduler.py: Use threads instead of processes for jobs
This changes how the scheduler works and adapts all the code that needs adapting in order to be able to run in threads instead of in subprocesses, which helps with Windows support, and will allow some simplifications in the main pipeline.
-rw-r--r--src/buildstream/_cas/cascache.py67
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py72
-rw-r--r--src/buildstream/_context.py14
-rw-r--r--src/buildstream/_messenger.py58
-rw-r--r--src/buildstream/_remote.py56
-rw-r--r--src/buildstream/_scheduler/_multiprocessing.py79
-rw-r--r--src/buildstream/_scheduler/jobs/job.py151
-rw-r--r--src/buildstream/_scheduler/scheduler.py69
-rw-r--r--src/buildstream/downloadablefilesource.py5
-rw-r--r--src/buildstream/source.py8
-rw-r--r--src/buildstream/testing/_fixtures.py1
-rw-r--r--tests/internals/cascache.py9
12 files changed, 208 insertions, 381 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7936121ea..b4e206370 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -22,11 +22,9 @@ import itertools
import os
import stat
import contextlib
-import ctypes
-import multiprocessing
-import signal
import time
from typing import Optional, List
+import threading
import grpc
@@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.build.buildgrid import local_cas_pb2
-from .. import _signals, utils
+from .. import utils
from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
@@ -93,6 +91,7 @@ class CASCache:
self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
+ self._cache_usage_monitor.start()
# get_cas():
#
@@ -132,7 +131,8 @@ class CASCache:
#
def release_resources(self, messenger=None):
if self._cache_usage_monitor:
- self._cache_usage_monitor.release_resources()
+ self._cache_usage_monitor.stop()
+ self._cache_usage_monitor.join()
if self._casd_process_manager:
self.close_grpc_channels()
@@ -731,65 +731,42 @@ class _CASCacheUsage:
# This manages the subprocess that tracks cache usage information via
# buildbox-casd.
#
-class _CASCacheUsageMonitor:
+class _CASCacheUsageMonitor(threading.Thread):
def __init__(self, connection):
+ super().__init__()
self._connection = connection
-
- # Shared memory (64-bit signed integer) for current disk usage and quota
- self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
- self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
-
- # multiprocessing.Process will fork without exec on Unix.
- # This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and connection.is_closed()
-
- # Block SIGINT, we don't want to kill the process when we interrupt the frontend
- # and this process if very lightweight.
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._subprocess = multiprocessing.Process(target=self._subprocess_run)
- self._subprocess.start()
+ self._disk_usage = None
+ self._disk_quota = None
+ self._should_stop = False
def get_cache_usage(self):
- disk_usage = self._disk_usage.value
- disk_quota = self._disk_quota.value
-
- if disk_usage < 0:
- # Disk usage still unknown
- disk_usage = None
-
- if disk_quota <= 0:
- # No disk quota
- disk_quota = None
-
- return _CASCacheUsage(disk_usage, disk_quota)
-
- def release_resources(self):
- # Simply terminate the subprocess, no cleanup required in the subprocess
- self._subprocess.terminate()
+ # FIXME: remove this abstraction
+ return _CASCacheUsage(self._disk_usage, self._disk_quota)
- def _subprocess_run(self):
- # Reset SIGTERM in subprocess to default as no cleanup is necessary
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ def stop(self):
+ self._should_stop = True
- disk_usage = self._disk_usage
- disk_quota = self._disk_quota
+ def run(self):
local_cas = self._connection.get_local_cas()
- while True:
+ while not self._should_stop:
try:
# Ask buildbox-casd for current value
request = local_cas_pb2.GetLocalDiskUsageRequest()
response = local_cas.GetLocalDiskUsage(request)
# Update values in shared memory
- disk_usage.value = response.size_bytes
- disk_quota.value = response.quota_bytes
+ self._disk_usage = response.size_bytes
+ self._disk_quota = response.quota_bytes
except grpc.RpcError:
# Terminate loop when buildbox-casd becomes unavailable
break
# Sleep until next refresh
- time.sleep(_CACHE_USAGE_REFRESH)
+ for _ in range(_CACHE_USAGE_REFRESH * 10):
+ if self._should_stop:
+ break
+ time.sleep(0.1)
def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 637c4e0b1..634f696ce 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -17,6 +17,7 @@
#
import contextlib
+import threading
import os
import random
import shutil
@@ -237,33 +238,42 @@ class CASDChannel:
self._casd_cas = None
self._local_cas = None
self._casd_pid = casd_pid
+ self._shutdown_requested = False
- def _establish_connection(self):
- assert self._casd_channel is None
+ self.lock = threading.Lock()
- while not os.path.exists(self._socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than specified timeout period
- if time.time() > self._start_time + _CASD_TIMEOUT:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+ def _establish_connection(self):
+ with self.lock:
+ if self._casd_channel is not None:
+ return
- # check that process is still alive
- try:
- proc = psutil.Process(self._casd_pid)
- if proc.status() == psutil.STATUS_ZOMBIE:
- proc.wait()
+ while not os.path.exists(self._socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than specified timeout period
+ if time.time() > self._start_time + _CASD_TIMEOUT:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
- if not proc.is_running():
+ # check that process is still alive
+ try:
+ proc = psutil.Process(self._casd_pid)
+ if proc.status() == psutil.STATUS_ZOMBIE:
+ proc.wait()
+
+ if not proc.is_running():
+ if self._shutdown_requested:
+ return
+ raise CASCacheError("buildbox-casd process died before connection could be established")
+ except psutil.NoSuchProcess:
+ if self._shutdown_requested:
+ return
raise CASCacheError("buildbox-casd process died before connection could be established")
- except psutil.NoSuchProcess:
- raise CASCacheError("buildbox-casd process died before connection could be established")
- time.sleep(0.01)
+ time.sleep(0.01)
- self._casd_channel = grpc.insecure_channel(self._connection_string)
- self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+ self._casd_channel = grpc.insecure_channel(self._connection_string)
+ self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
# get_cas():
#
@@ -279,12 +289,12 @@ class CASDChannel:
# Return LocalCAS stub for buildbox-casd channel.
#
def get_local_cas(self):
- if self._casd_channel is None:
+ if self._local_cas is None:
self._establish_connection()
return self._local_cas
def get_bytestream(self):
- if self._casd_channel is None:
+ if self._bytestream is None:
self._establish_connection()
return self._bytestream
@@ -300,10 +310,14 @@ class CASDChannel:
# Close the casd channel.
#
def close(self):
- if self.is_closed():
- return
- self._local_cas = None
- self._casd_cas = None
- self._bytestream = None
- self._casd_channel.close()
- self._casd_channel = None
+ with self.lock:
+ self._shutdown_requested = True
+
+ if self.is_closed():
+ return
+
+ self._local_cas = None
+ self._casd_cas = None
+ self._bytestream = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c0e92b98e..ef68d9556 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -549,17 +549,3 @@ class Context:
log_directory=self.logdir,
)
return self._cascache
-
- # prepare_fork():
- #
- # Prepare this process for fork without exec. This is a safeguard against
- # fork issues with multiple threads and gRPC connections.
- #
- def prepare_fork(self):
- # gRPC channels must be closed before fork.
- for cache in [self._cascache, self._artifactcache, self._sourcecache]:
- if cache:
- cache.close_grpc_channels()
-
- # Do not allow fork if there are background threads.
- return utils._is_single_threaded()
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3a32a2467..222b05d17 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -19,10 +19,10 @@
import os
import datetime
+import threading
from contextlib import contextmanager
from . import _signals
-from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
@@ -48,15 +48,17 @@ class _TimeData:
class Messenger:
def __init__(self):
- self._message_handler = None
- self._silence_scope_depth = 0
- self._log_handle = None
- self._log_filename = None
self._state = None
self._next_render = None # A Time object
self._active_simple_tasks = 0
self._render_status_cb = None
+ self._locals = threading.local()
+ self._locals.message_handler = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
+ self._locals.silence_scope_depth = 0
+
# set_message_handler()
#
# Sets the handler for any status messages propagated through
@@ -70,7 +72,7 @@ class Messenger:
# ) -> None
#
def set_message_handler(self, handler):
- self._message_handler = handler
+ self._locals.message_handler = handler
# set_state()
#
@@ -101,7 +103,7 @@ class Messenger:
# (bool): Whether messages are currently being silenced
#
def _silent_messages(self):
- return self._silence_scope_depth > 0
+ return self._locals.silence_scope_depth > 0
# message():
#
@@ -112,16 +114,15 @@ class Messenger:
# message: A Message object
#
def message(self, message):
-
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
- assert self._message_handler
+ assert self._locals.message_handler
- self._message_handler(message, is_silenced=self._silent_messages())
+ self._locals.message_handler(message, is_silenced=self._silent_messages())
# silence()
#
@@ -141,12 +142,12 @@ class Messenger:
yield
return
- self._silence_scope_depth += 1
+ self._locals.silence_scope_depth += 1
try:
yield
finally:
- assert self._silence_scope_depth > 0
- self._silence_scope_depth -= 1
+ assert self._locals.silence_scope_depth > 0
+ self._locals.silence_scope_depth -= 1
# timed_activity()
#
@@ -265,22 +266,21 @@ class Messenger:
#
@contextmanager
def recorded_messages(self, filename, logdir):
-
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
+ assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
+ assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
- self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.silence_scope_depth = 0
# Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
+ directory = os.path.dirname(self._locals.log_filename)
os.makedirs(directory, exist_ok=True)
- with open(self._log_filename, "a") as logfile:
+ with open(self._locals.log_filename, "a") as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
@@ -295,12 +295,12 @@ class Messenger:
except RuntimeError:
os.fsync(logfile.fileno())
- self._log_handle = logfile
+ self._locals.log_handle = logfile
with _signals.terminator(flush_log):
- yield self._log_filename
+ yield self._locals.log_filename
- self._log_handle = None
- self._log_filename = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
# get_log_handle()
#
@@ -312,7 +312,7 @@ class Messenger:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
- return self._log_handle
+ return self._locals.log_handle
# get_log_filename()
#
@@ -324,7 +324,7 @@ class Messenger:
# (str): The active logging filename, or None
#
def get_log_filename(self):
- return self._log_filename
+ return self._locals.log_filename
# _record_message()
#
@@ -335,7 +335,7 @@ class Messenger:
#
def _record_message(self, message):
- if self._log_handle is None:
+ if self._locals.log_handle is None:
return
INDENT = " "
@@ -372,8 +372,8 @@ class Messenger:
)
# Write to the open log file
- self._log_handle.write("{}\n".format(text))
- self._log_handle.flush()
+ self._locals.log_handle.write("{}\n".format(text))
+ self._locals.log_handle.flush()
# _render_status()
#
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index d8b8e68fe..6d52ff56a 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -16,6 +16,7 @@
#
import os
+import threading
from collections import namedtuple
from urllib.parse import urlparse
@@ -146,41 +147,44 @@ class BaseRemote:
self.push = spec.push
self.url = spec.url
+ self._lock = threading.Lock()
+
# init():
#
# Initialize the given remote. This function must be called before
# any communication is performed, since such will otherwise fail.
#
def init(self):
- if self._initialized:
- return
-
- # Set up the communcation channel
- url = urlparse(self.spec.url)
- if url.scheme == "http":
- port = url.port or 80
- self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
- elif url.scheme == "https":
- port = url.port or 443
- try:
- server_cert, client_key, client_cert = _read_files(
- self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ with self._lock:
+ if self._initialized:
+ return
+
+ # Set up the communcation channel
+ url = urlparse(self.spec.url)
+ if url.scheme == "http":
+ port = url.port or 80
+ self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
+ elif url.scheme == "https":
+ port = url.port or 443
+ try:
+ server_cert, client_key, client_cert = _read_files(
+ self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ )
+ except FileNotFoundError as e:
+ raise RemoteError("Could not read certificates: {}".format(e)) from e
+ self.server_cert = server_cert
+ self.client_key = client_key
+ self.client_cert = client_cert
+ credentials = grpc.ssl_channel_credentials(
+ root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
)
- except FileNotFoundError as e:
- raise RemoteError("Could not read certificates: {}".format(e)) from e
- self.server_cert = server_cert
- self.client_key = client_key
- self.client_cert = client_cert
- credentials = grpc.ssl_channel_credentials(
- root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
- )
- self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
- else:
- raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+ self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
+ else:
+ raise RemoteError("Unsupported URL: {}".format(self.spec.url))
- self._configure_protocols()
+ self._configure_protocols()
- self._initialized = True
+ self._initialized = True
def __enter__(self):
return self
diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py
deleted file mode 100644
index 4864e140c..000000000
--- a/src/buildstream/_scheduler/_multiprocessing.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Copyright (C) 2019 Bloomberg Finance LP
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-
-# TLDR:
-# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
-#
-#
-# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
-#
-# The main problem that affects us is that the parent and the child will share some file handlers.
-# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
-# by the app so that the asyncio loop can treat them afterwards.
-#
-# This sharing means that when we send a signal to the child, the sighandler in the child will write
-# it back to the parent sig_handler_fd, making the parent have to treat it too.
-# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
-# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
-# the children...
-#
-# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
-# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
-#
-#
-# Relevant issues:
-# - Asyncio: support fork (https://bugs.python.org/issue21998)
-# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
-# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
-#
-#
-
-import multiprocessing
-import signal
-import sys
-from asyncio import set_event_loop_policy
-
-
-# _AsyncioSafeForkAwareProcess()
-#
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-#
-# Also automatically close any running asyncio loop before calling
-# the actual run target
-#
-class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
- def run(self):
- signal.set_wakeup_fd(-1)
- set_event_loop_policy(None)
-
- super().run()
-
-
-if sys.platform != "win32":
- # Set the default event loop policy to automatically close our asyncio loop in child processes
- AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
-
-else:
- # Windows doesn't support ChildWatcher that way anyways, we'll need another
- # implementation if we want it
- AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7ea87dc62..a4ace4187 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,17 +24,13 @@
import asyncio
import datetime
import multiprocessing
-import os
-import signal
-import sys
import traceback
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ... import _signals
# Return code values shutdown of job handling child processes
@@ -130,7 +126,6 @@ class Job:
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
- self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
@@ -143,6 +138,8 @@ class Job:
self._message_element_key = None # The element key for messaging
self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._task = None # The task that is run
+
# set_name()
#
# Sets the name of this job
@@ -157,11 +154,13 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
+ # FIXME: remove this, this is not necessary when using asyncio
self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
+ # FIXME: remove the parent/child separation, it's not needed anymore.
child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
self._messenger,
@@ -173,26 +172,18 @@ class Job:
self._message_element_key,
)
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
-
- # Block signals which are handled in the main process such that
- # the child process does not inherit the parent's state, but the main
- # process will be notified of any signal after we launch the child.
- #
- with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- with asyncio.get_child_watcher() as watcher:
- self._process.start()
- # Register the process to call `_parent_child_completed` once it is done
-
- # Close the write end of the pipe in the parent
- pipe_w.close()
+ loop = asyncio.get_event_loop()
- # Here we delay the call to the next loop tick. This is in order to be running
- # in the main thread, as the callback itself must be thread safe.
- def on_completion(pid, returncode):
- asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+ async def execute():
+ try:
+ result = await loop.run_in_executor(None, child_job.child_action, pipe_w)
+ except asyncio.CancelledError:
+ result = _ReturnCode.TERMINATED
+ except Exception: # pylint: disable=broad-except
+ result = _ReturnCode.FAIL
+ await self._parent_child_completed(result)
- watcher.add_child_handler(self._process.pid, on_completion)
+ self._task = loop.create_task(execute())
# terminate()
#
@@ -201,18 +192,14 @@ class Job:
# This will send a SIGTERM signal to the Job process.
#
def terminate(self):
-
- # First resume the job if it's suspended
- self.resume(silent=True)
-
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the pipe
self._parent_stop_listening()
# Terminate the process using multiprocessing API pathway
- if self._process:
- self._process.terminate()
+ if self._task:
+ self._task.cancel()
self._terminated = True
@@ -226,51 +213,6 @@ class Job:
def get_terminated(self):
return self._terminated
- # kill()
- #
- # Forcefully kill the process, and any children it might have.
- #
- def kill(self):
- # Force kill
- self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
- if self._process:
- utils._kill_process_tree(self._process.pid)
-
- # suspend()
- #
- # Suspend this job.
- #
- def suspend(self):
- if not self._suspended:
- self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
-
- try:
- # Use SIGTSTP so that child processes may handle and propagate
- # it to processes they start that become session leaders.
- os.kill(self._process.pid, signal.SIGTSTP)
-
- # For some reason we receive exactly one suspend event for
- # every SIGTSTP we send to the child process, even though the
- # child processes are setsid(). We keep a count of these so we
- # can ignore them in our event loop suspend_event().
- self._scheduler.internal_stops += 1
- self._suspended = True
- except ProcessLookupError:
- # ignore, process has already exited
- pass
-
- # resume()
- #
- # Resume this suspended job.
- #
- def resume(self, silent=False):
- if self._suspended:
- if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
-
- os.kill(self._process.pid, signal.SIGCONT)
- self._suspended = False
-
# set_message_element_name()
#
# This is called by Job subclasses to set the plugin instance element
@@ -397,10 +339,9 @@ class Job:
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
#
# Args:
- # pid (int): The PID of the child which completed
# returncode (int): The return code of the child process
#
- def _parent_child_completed(self, pid, returncode):
+ async def _parent_child_completed(self, returncode):
self._parent_shutdown()
try:
@@ -431,16 +372,16 @@ class Job:
status = JobStatus.FAIL
elif returncode == _ReturnCode.TERMINATED:
if self._terminated:
- self.message(MessageType.INFO, "Process was terminated")
+ self.message(MessageType.INFO, "Job terminated")
else:
- self.message(MessageType.ERROR, "Process was terminated unexpectedly")
+ self.message(MessageType.ERROR, "Job was terminated unexpectedly")
status = JobStatus.FAIL
elif returncode == _ReturnCode.KILLED:
if self._terminated:
- self.message(MessageType.INFO, "Process was killed")
+ self.message(MessageType.INFO, "Job was killed")
else:
- self.message(MessageType.ERROR, "Process was killed unexpectedly")
+ self.message(MessageType.ERROR, "Job was killed unexpectedly")
status = JobStatus.FAIL
else:
@@ -451,7 +392,7 @@ class Job:
# Force the deletion of the pipe and process objects to try and clean up FDs
self._pipe_r.close()
- self._pipe_r = self._process = None
+ self._pipe_r = self._task = None
# _parent_process_envelope()
#
@@ -655,19 +596,6 @@ class ChildJob:
# pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
#
def child_action(self, pipe_w):
-
- # This avoids some SIGTSTP signals from grandchildren
- # getting propagated up to the master process
- os.setsid()
-
- # First set back to the default signal handlers for the signals
- # we handle, and then clear their blocked state.
- #
- signal_list = [signal.SIGTSTP, signal.SIGTERM]
- for sig in signal_list:
- signal.signal(sig, signal.SIG_DFL)
- signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
-
# Assign the pipe we passed across the process boundaries
#
# Set the global message handler in this child
@@ -687,16 +615,11 @@ class ChildJob:
nonlocal starttime
starttime += datetime.datetime.now() - stopped_time
- # Graciously handle sigterms.
- def handle_sigterm():
- self._child_shutdown(_ReturnCode.TERMINATED)
-
# Time, log and and run the action function
#
- with _signals.terminator(handle_sigterm), _signals.suspendable(
- stop_time, resume_time
- ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
-
+ with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ self._logfile, self._logdir
+ ) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
try:
@@ -707,7 +630,7 @@ class ChildJob:
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- self._child_shutdown(_ReturnCode.SKIPPED)
+ return _ReturnCode.SKIPPED
except BstError as e:
elapsed = datetime.datetime.now() - starttime
retry_flag = e.temporary
@@ -731,7 +654,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
except Exception: # pylint: disable=broad-except
@@ -744,7 +667,7 @@ class ChildJob:
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
# Unhandled exceptions should permenantly fail
- self._child_shutdown(_ReturnCode.PERM_FAIL)
+ return _ReturnCode.PERM_FAIL
else:
# No exception occurred in the action
@@ -757,7 +680,9 @@ class ChildJob:
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- self._child_shutdown(_ReturnCode.OK)
+ return _ReturnCode.OK
+ finally:
+ self._pipe_w.close()
#######################################################
# Local Private Methods #
@@ -809,18 +734,6 @@ class ChildJob:
if result is not None:
self._send_message(_MessageType.RESULT, result)
- # _child_shutdown()
- #
- # Shuts down the child process by cleaning up and exiting the process
- #
- # Args:
- # exit_code (_ReturnCode): The exit code to exit with
- #
- def _child_shutdown(self, exit_code):
- self._pipe_w.close()
- assert isinstance(exit_code, _ReturnCode)
- sys.exit(exit_code.value)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3e6bf1f92..7380f07e8 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,14 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# System imports
+import functools
import os
import asyncio
from itertools import chain
import signal
import datetime
import sys
+from concurrent.futures import ThreadPoolExecutor
# Local imports
from .resources import Resources
@@ -34,9 +36,7 @@ from ..types import FastEnum
from .._profile import Topics, PROFILER
from .._message import Message, MessageType
from ..plugin import Plugin
-
-
-_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds
+from .. import _signals
# A decent return code for Scheduler.run()
@@ -46,6 +46,23 @@ class SchedStatus(FastEnum):
TERMINATED = 1
+def reset_signals_on_exit(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ orig_sigint = signal.getsignal(signal.SIGINT)
+ orig_sigterm = signal.getsignal(signal.SIGTERM)
+ orig_sigtstp = signal.getsignal(signal.SIGTSTP)
+
+ try:
+ return func(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGINT, orig_sigint)
+ signal.signal(signal.SIGTERM, orig_sigterm)
+ signal.signal(signal.SIGTSTP, orig_sigtstp)
+
+ return wrapper
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -79,7 +96,6 @@ class Scheduler:
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -113,6 +129,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
+ @reset_signals_on_exit
def run(self, queues, casd_process_manager):
# Hold on to the queues to process
@@ -149,10 +166,14 @@ class Scheduler:
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
- # Run the queues
- self._sched()
- self.loop.run_forever()
- self.loop.close()
+ # FIXME: this should be done in a cleaner way
+ with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None):
+ with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool:
+ self.loop.set_default_executor(pool)
+ # Run the queues
+ self._sched()
+ self.loop.run_forever()
+ self.loop.close()
# Stop watching casd
_watcher.remove_child_handler(self._casd_process.pid)
@@ -348,13 +369,6 @@ class Scheduler:
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # Make sure fork is allowed before starting jobs
- if not self.context.prepare_fork():
- message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self.context.messenger.message(message)
- self.terminate()
- return
-
# Start the jobs
#
for job in ready:
@@ -412,9 +426,9 @@ class Scheduler:
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- # Notify that we're suspended
- for job in self._active_jobs:
- job.suspend()
+
+ for suspender in reversed(_signals.suspendable_stack):
+ suspender.suspend()
# _resume_jobs()
#
@@ -422,8 +436,9 @@ class Scheduler:
#
def _resume_jobs(self):
if self.suspended:
- for job in self._active_jobs:
- job.resume()
+ for suspender in _signals.suspendable_stack:
+ suspender.resume()
+
self.suspended = False
# Notify that we're unsuspended
self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
@@ -455,12 +470,6 @@ class Scheduler:
# A loop registered event callback for SIGTSTP
#
def _suspend_event(self):
-
- # Ignore the feedback signals from Job.suspend()
- if self.internal_stops:
- self.internal_stops -= 1
- return
-
# No need to care if jobs were suspended or not, we _only_ handle this
# while we know jobs are not suspended.
self._suspend_jobs()
@@ -482,12 +491,8 @@ class Scheduler:
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- def kill_jobs():
- for job_ in self._active_jobs:
- job_.kill()
-
- # Schedule all jobs to be killed if they have not exited after timeout
- self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs)
+ for terminator in _signals.terminator_stack.copy():
+ terminator()
for job in self._active_jobs:
job.terminate()
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index 7c2da1c02..b9ca91945 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -259,6 +259,11 @@ class DownloadableFileSource(Source):
return self.__default_mirror_file
+ @classmethod
+ def _reset_url_opener(cls):
+ # Needed for tests, in order to cleanup the `netrc` configuration.
+ cls.__urlopener = None
+
def __get_urlopener(self):
if not DownloadableFileSource.__urlopener:
try:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f15d5a628..84315904f 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -990,19 +990,11 @@ class Source(Plugin):
clean = node.strip_node_info()
to_modify = node.strip_node_info()
- current_ref = self.get_ref() # pylint: disable=assignment-from-no-return
-
# Set the ref regardless of whether it changed, the
# TrackQueue() will want to update a specific node with
# the ref, regardless of whether the original has changed.
self.set_ref(new_ref, to_modify)
- if current_ref == new_ref or not save:
- # Note: We do not look for and propagate changes at this point
- # which might result in desync depending if something changes about
- # tracking in the future. For now, this is quite safe.
- return False
-
actions = {}
for k, v in clean.items():
if k not in to_modify:
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 5d1c1d227..520f68587 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -42,3 +42,4 @@ def thread_check(default_thread_number):
@pytest.fixture(autouse=True)
def reset_global_node_state():
node._reset_global_state()
+ DownloadableFileSource._reset_url_opener()
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index 043531c24..e27e40974 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -3,6 +3,7 @@ import time
from unittest.mock import MagicMock
from buildstream._cas.cascache import CASCache
+from buildstream._cas import casdprocessmanager
from buildstream._message import MessageType
from buildstream._messenger import Messenger
@@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))
@@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))