summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_cas/cascache.py67
-rw-r--r--src/buildstream/_cas/casdprocessmanager.py72
-rw-r--r--src/buildstream/_context.py14
-rw-r--r--src/buildstream/_messenger.py58
-rw-r--r--src/buildstream/_remote.py56
-rw-r--r--src/buildstream/_scheduler/_multiprocessing.py79
-rw-r--r--src/buildstream/_scheduler/jobs/job.py151
-rw-r--r--src/buildstream/_scheduler/scheduler.py69
-rw-r--r--src/buildstream/downloadablefilesource.py5
-rw-r--r--src/buildstream/source.py8
-rw-r--r--src/buildstream/testing/_fixtures.py1
-rw-r--r--tests/internals/cascache.py9
12 files changed, 208 insertions, 381 deletions
diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7936121ea..b4e206370 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -22,11 +22,9 @@ import itertools
import os
import stat
import contextlib
-import ctypes
-import multiprocessing
-import signal
import time
from typing import Optional, List
+import threading
import grpc
@@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2
from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from .._protos.build.buildgrid import local_cas_pb2
-from .. import _signals, utils
+from .. import utils
from ..types import FastEnum, SourceRef
from .._exceptions import CASCacheError
@@ -93,6 +91,7 @@ class CASCache:
self._casd_channel = self._casd_process_manager.create_channel()
self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
+ self._cache_usage_monitor.start()
# get_cas():
#
@@ -132,7 +131,8 @@ class CASCache:
#
def release_resources(self, messenger=None):
if self._cache_usage_monitor:
- self._cache_usage_monitor.release_resources()
+ self._cache_usage_monitor.stop()
+ self._cache_usage_monitor.join()
if self._casd_process_manager:
self.close_grpc_channels()
@@ -731,65 +731,42 @@ class _CASCacheUsage:
# This manages the subprocess that tracks cache usage information via
# buildbox-casd.
#
-class _CASCacheUsageMonitor:
+class _CASCacheUsageMonitor(threading.Thread):
def __init__(self, connection):
+ super().__init__()
self._connection = connection
-
- # Shared memory (64-bit signed integer) for current disk usage and quota
- self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
- self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
-
- # multiprocessing.Process will fork without exec on Unix.
- # This can't be allowed with background threads or open gRPC channels.
- assert utils._is_single_threaded() and connection.is_closed()
-
- # Block SIGINT, we don't want to kill the process when we interrupt the frontend
- # and this process if very lightweight.
- with _signals.blocked([signal.SIGINT], ignore=False):
- self._subprocess = multiprocessing.Process(target=self._subprocess_run)
- self._subprocess.start()
+ self._disk_usage = None
+ self._disk_quota = None
+ self._should_stop = False
def get_cache_usage(self):
- disk_usage = self._disk_usage.value
- disk_quota = self._disk_quota.value
-
- if disk_usage < 0:
- # Disk usage still unknown
- disk_usage = None
-
- if disk_quota <= 0:
- # No disk quota
- disk_quota = None
-
- return _CASCacheUsage(disk_usage, disk_quota)
-
- def release_resources(self):
- # Simply terminate the subprocess, no cleanup required in the subprocess
- self._subprocess.terminate()
+ # FIXME: remove this abstraction
+ return _CASCacheUsage(self._disk_usage, self._disk_quota)
- def _subprocess_run(self):
- # Reset SIGTERM in subprocess to default as no cleanup is necessary
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ def stop(self):
+ self._should_stop = True
- disk_usage = self._disk_usage
- disk_quota = self._disk_quota
+ def run(self):
local_cas = self._connection.get_local_cas()
- while True:
+ while not self._should_stop:
try:
# Ask buildbox-casd for current value
request = local_cas_pb2.GetLocalDiskUsageRequest()
response = local_cas.GetLocalDiskUsage(request)
# Update values in shared memory
- disk_usage.value = response.size_bytes
- disk_quota.value = response.quota_bytes
+ self._disk_usage = response.size_bytes
+ self._disk_quota = response.quota_bytes
except grpc.RpcError:
# Terminate loop when buildbox-casd becomes unavailable
break
# Sleep until next refresh
- time.sleep(_CACHE_USAGE_REFRESH)
+ for _ in range(_CACHE_USAGE_REFRESH * 10):
+ if self._should_stop:
+ break
+ time.sleep(0.1)
def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 637c4e0b1..634f696ce 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -17,6 +17,7 @@
#
import contextlib
+import threading
import os
import random
import shutil
@@ -237,33 +238,42 @@ class CASDChannel:
self._casd_cas = None
self._local_cas = None
self._casd_pid = casd_pid
+ self._shutdown_requested = False
- def _establish_connection(self):
- assert self._casd_channel is None
+ self.lock = threading.Lock()
- while not os.path.exists(self._socket_path):
- # casd is not ready yet, try again after a 10ms delay,
- # but don't wait for more than specified timeout period
- if time.time() > self._start_time + _CASD_TIMEOUT:
- raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+ def _establish_connection(self):
+ with self.lock:
+ if self._casd_channel is not None:
+ return
- # check that process is still alive
- try:
- proc = psutil.Process(self._casd_pid)
- if proc.status() == psutil.STATUS_ZOMBIE:
- proc.wait()
+ while not os.path.exists(self._socket_path):
+ # casd is not ready yet, try again after a 10ms delay,
+ # but don't wait for more than specified timeout period
+ if time.time() > self._start_time + _CASD_TIMEOUT:
+ raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
- if not proc.is_running():
+ # check that process is still alive
+ try:
+ proc = psutil.Process(self._casd_pid)
+ if proc.status() == psutil.STATUS_ZOMBIE:
+ proc.wait()
+
+ if not proc.is_running():
+ if self._shutdown_requested:
+ return
+ raise CASCacheError("buildbox-casd process died before connection could be established")
+ except psutil.NoSuchProcess:
+ if self._shutdown_requested:
+ return
raise CASCacheError("buildbox-casd process died before connection could be established")
- except psutil.NoSuchProcess:
- raise CASCacheError("buildbox-casd process died before connection could be established")
- time.sleep(0.01)
+ time.sleep(0.01)
- self._casd_channel = grpc.insecure_channel(self._connection_string)
- self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
- self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
- self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+ self._casd_channel = grpc.insecure_channel(self._connection_string)
+ self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
+ self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+ self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
# get_cas():
#
@@ -279,12 +289,12 @@ class CASDChannel:
# Return LocalCAS stub for buildbox-casd channel.
#
def get_local_cas(self):
- if self._casd_channel is None:
+ if self._local_cas is None:
self._establish_connection()
return self._local_cas
def get_bytestream(self):
- if self._casd_channel is None:
+ if self._bytestream is None:
self._establish_connection()
return self._bytestream
@@ -300,10 +310,14 @@ class CASDChannel:
# Close the casd channel.
#
def close(self):
- if self.is_closed():
- return
- self._local_cas = None
- self._casd_cas = None
- self._bytestream = None
- self._casd_channel.close()
- self._casd_channel = None
+ with self.lock:
+ self._shutdown_requested = True
+
+ if self.is_closed():
+ return
+
+ self._local_cas = None
+ self._casd_cas = None
+ self._bytestream = None
+ self._casd_channel.close()
+ self._casd_channel = None
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index c0e92b98e..ef68d9556 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -549,17 +549,3 @@ class Context:
log_directory=self.logdir,
)
return self._cascache
-
- # prepare_fork():
- #
- # Prepare this process for fork without exec. This is a safeguard against
- # fork issues with multiple threads and gRPC connections.
- #
- def prepare_fork(self):
- # gRPC channels must be closed before fork.
- for cache in [self._cascache, self._artifactcache, self._sourcecache]:
- if cache:
- cache.close_grpc_channels()
-
- # Do not allow fork if there are background threads.
- return utils._is_single_threaded()
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3a32a2467..222b05d17 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -19,10 +19,10 @@
import os
import datetime
+import threading
from contextlib import contextmanager
from . import _signals
-from . import utils
from ._exceptions import BstError
from ._message import Message, MessageType
@@ -48,15 +48,17 @@ class _TimeData:
class Messenger:
def __init__(self):
- self._message_handler = None
- self._silence_scope_depth = 0
- self._log_handle = None
- self._log_filename = None
self._state = None
self._next_render = None # A Time object
self._active_simple_tasks = 0
self._render_status_cb = None
+ self._locals = threading.local()
+ self._locals.message_handler = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
+ self._locals.silence_scope_depth = 0
+
# set_message_handler()
#
# Sets the handler for any status messages propagated through
@@ -70,7 +72,7 @@ class Messenger:
# ) -> None
#
def set_message_handler(self, handler):
- self._message_handler = handler
+ self._locals.message_handler = handler
# set_state()
#
@@ -101,7 +103,7 @@ class Messenger:
# (bool): Whether messages are currently being silenced
#
def _silent_messages(self):
- return self._silence_scope_depth > 0
+ return self._locals.silence_scope_depth > 0
# message():
#
@@ -112,16 +114,15 @@ class Messenger:
# message: A Message object
#
def message(self, message):
-
# If we are recording messages, dump a copy into the open log file.
self._record_message(message)
# Send it off to the log handler (can be the frontend,
# or it can be the child task which will propagate
# to the frontend)
- assert self._message_handler
+ assert self._locals.message_handler
- self._message_handler(message, is_silenced=self._silent_messages())
+ self._locals.message_handler(message, is_silenced=self._silent_messages())
# silence()
#
@@ -141,12 +142,12 @@ class Messenger:
yield
return
- self._silence_scope_depth += 1
+ self._locals.silence_scope_depth += 1
try:
yield
finally:
- assert self._silence_scope_depth > 0
- self._silence_scope_depth -= 1
+ assert self._locals.silence_scope_depth > 0
+ self._locals.silence_scope_depth -= 1
# timed_activity()
#
@@ -265,22 +266,21 @@ class Messenger:
#
@contextmanager
def recorded_messages(self, filename, logdir):
-
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
- assert self._log_handle is None
- assert self._log_filename is None
- assert not utils._is_main_process()
+ assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
+ assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
- self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.silence_scope_depth = 0
# Ensure the directory exists first
- directory = os.path.dirname(self._log_filename)
+ directory = os.path.dirname(self._locals.log_filename)
os.makedirs(directory, exist_ok=True)
- with open(self._log_filename, "a") as logfile:
+ with open(self._locals.log_filename, "a") as logfile:
# Write one last line to the log and flush it to disk
def flush_log():
@@ -295,12 +295,12 @@ class Messenger:
except RuntimeError:
os.fsync(logfile.fileno())
- self._log_handle = logfile
+ self._locals.log_handle = logfile
with _signals.terminator(flush_log):
- yield self._log_filename
+ yield self._locals.log_filename
- self._log_handle = None
- self._log_filename = None
+ self._locals.log_handle = None
+ self._locals.log_filename = None
# get_log_handle()
#
@@ -312,7 +312,7 @@ class Messenger:
# (file): The active logging file handle, or None
#
def get_log_handle(self):
- return self._log_handle
+ return self._locals.log_handle
# get_log_filename()
#
@@ -324,7 +324,7 @@ class Messenger:
# (str): The active logging filename, or None
#
def get_log_filename(self):
- return self._log_filename
+ return self._locals.log_filename
# _record_message()
#
@@ -335,7 +335,7 @@ class Messenger:
#
def _record_message(self, message):
- if self._log_handle is None:
+ if self._locals.log_handle is None:
return
INDENT = " "
@@ -372,8 +372,8 @@ class Messenger:
)
# Write to the open log file
- self._log_handle.write("{}\n".format(text))
- self._log_handle.flush()
+ self._locals.log_handle.write("{}\n".format(text))
+ self._locals.log_handle.flush()
# _render_status()
#
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index d8b8e68fe..6d52ff56a 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -16,6 +16,7 @@
#
import os
+import threading
from collections import namedtuple
from urllib.parse import urlparse
@@ -146,41 +147,44 @@ class BaseRemote:
self.push = spec.push
self.url = spec.url
+ self._lock = threading.Lock()
+
# init():
#
# Initialize the given remote. This function must be called before
# any communication is performed, since such will otherwise fail.
#
def init(self):
- if self._initialized:
- return
-
- # Set up the communcation channel
- url = urlparse(self.spec.url)
- if url.scheme == "http":
- port = url.port or 80
- self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
- elif url.scheme == "https":
- port = url.port or 443
- try:
- server_cert, client_key, client_cert = _read_files(
- self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ with self._lock:
+ if self._initialized:
+ return
+
+ # Set up the communcation channel
+ url = urlparse(self.spec.url)
+ if url.scheme == "http":
+ port = url.port or 80
+ self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
+ elif url.scheme == "https":
+ port = url.port or 443
+ try:
+ server_cert, client_key, client_cert = _read_files(
+ self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+ )
+ except FileNotFoundError as e:
+ raise RemoteError("Could not read certificates: {}".format(e)) from e
+ self.server_cert = server_cert
+ self.client_key = client_key
+ self.client_cert = client_cert
+ credentials = grpc.ssl_channel_credentials(
+ root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
)
- except FileNotFoundError as e:
- raise RemoteError("Could not read certificates: {}".format(e)) from e
- self.server_cert = server_cert
- self.client_key = client_key
- self.client_cert = client_cert
- credentials = grpc.ssl_channel_credentials(
- root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
- )
- self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
- else:
- raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+ self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
+ else:
+ raise RemoteError("Unsupported URL: {}".format(self.spec.url))
- self._configure_protocols()
+ self._configure_protocols()
- self._initialized = True
+ self._initialized = True
def __enter__(self):
return self
diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py
deleted file mode 100644
index 4864e140c..000000000
--- a/src/buildstream/_scheduler/_multiprocessing.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Copyright (C) 2019 Bloomberg Finance LP
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-
-# TLDR:
-# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
-#
-#
-# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
-#
-# The main problem that affects us is that the parent and the child will share some file handlers.
-# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
-# by the app so that the asyncio loop can treat them afterwards.
-#
-# This sharing means that when we send a signal to the child, the sighandler in the child will write
-# it back to the parent sig_handler_fd, making the parent have to treat it too.
-# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
-# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
-# the children...
-#
-# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process that automatically
-# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
-#
-#
-# Relevant issues:
-# - Asyncio: support fork (https://bugs.python.org/issue21998)
-# - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
-# - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
-#
-#
-
-import multiprocessing
-import signal
-import sys
-from asyncio import set_event_loop_policy
-
-
-# _AsyncioSafeForkAwareProcess()
-#
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-#
-# Also automatically close any running asyncio loop before calling
-# the actual run target
-#
-class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
- # pylint: disable=attribute-defined-outside-init
- def start(self):
- self._popen = self._Popen(self)
- self._sentinel = self._popen.sentinel
-
- def run(self):
- signal.set_wakeup_fd(-1)
- set_event_loop_policy(None)
-
- super().run()
-
-
-if sys.platform != "win32":
- # Set the default event loop policy to automatically close our asyncio loop in child processes
- AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
-
-else:
- # Windows doesn't support ChildWatcher that way anyways, we'll need another
- # implementation if we want it
- AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7ea87dc62..a4ace4187 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,17 +24,13 @@
import asyncio
import datetime
import multiprocessing
-import os
-import signal
-import sys
import traceback
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ... import _signals
# Return code values shutdown of job handling child processes
@@ -130,7 +126,6 @@ class Job:
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
- self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
@@ -143,6 +138,8 @@ class Job:
self._message_element_key = None # The element key for messaging
self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._task = None # The task that is run
+
# set_name()
#
# Sets the name of this job
@@ -157,11 +154,13 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
+ # FIXME: remove this, this is not necessary when using asyncio
self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
self._parent_start_listening()
+ # FIXME: remove the parent/child separation, it's not needed anymore.
child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
self._messenger,
@@ -173,26 +172,18 @@ class Job:
self._message_element_key,
)
- self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
-
- # Block signals which are handled in the main process such that
- # the child process does not inherit the parent's state, but the main
- # process will be notified of any signal after we launch the child.
- #
- with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
- with asyncio.get_child_watcher() as watcher:
- self._process.start()
- # Register the process to call `_parent_child_completed` once it is done
-
- # Close the write end of the pipe in the parent
- pipe_w.close()
+ loop = asyncio.get_event_loop()
- # Here we delay the call to the next loop tick. This is in order to be running
- # in the main thread, as the callback itself must be thread safe.
- def on_completion(pid, returncode):
- asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+ async def execute():
+ try:
+ result = await loop.run_in_executor(None, child_job.child_action, pipe_w)
+ except asyncio.CancelledError:
+ result = _ReturnCode.TERMINATED
+ except Exception: # pylint: disable=broad-except
+ result = _ReturnCode.FAIL
+ await self._parent_child_completed(result)
- watcher.add_child_handler(self._process.pid, on_completion)
+ self._task = loop.create_task(execute())
# terminate()
#
@@ -201,18 +192,14 @@ class Job:
# This will send a SIGTERM signal to the Job process.
#
def terminate(self):
-
- # First resume the job if it's suspended
- self.resume(silent=True)
-
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the pipe
self._parent_stop_listening()
# Terminate the process using multiprocessing API pathway
- if self._process:
- self._process.terminate()
+ if self._task:
+ self._task.cancel()
self._terminated = True
@@ -226,51 +213,6 @@ class Job:
def get_terminated(self):
return self._terminated
- # kill()
- #
- # Forcefully kill the process, and any children it might have.
- #
- def kill(self):
- # Force kill
- self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
- if self._process:
- utils._kill_process_tree(self._process.pid)
-
- # suspend()
- #
- # Suspend this job.
- #
- def suspend(self):
- if not self._suspended:
- self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
-
- try:
- # Use SIGTSTP so that child processes may handle and propagate
- # it to processes they start that become session leaders.
- os.kill(self._process.pid, signal.SIGTSTP)
-
- # For some reason we receive exactly one suspend event for
- # every SIGTSTP we send to the child process, even though the
- # child processes are setsid(). We keep a count of these so we
- # can ignore them in our event loop suspend_event().
- self._scheduler.internal_stops += 1
- self._suspended = True
- except ProcessLookupError:
- # ignore, process has already exited
- pass
-
- # resume()
- #
- # Resume this suspended job.
- #
- def resume(self, silent=False):
- if self._suspended:
- if not silent and not self._scheduler.terminated:
- self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
-
- os.kill(self._process.pid, signal.SIGCONT)
- self._suspended = False
-
# set_message_element_name()
#
# This is called by Job subclasses to set the plugin instance element
@@ -397,10 +339,9 @@ class Job:
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
#
# Args:
- # pid (int): The PID of the child which completed
# returncode (int): The return code of the child process
#
- def _parent_child_completed(self, pid, returncode):
+ async def _parent_child_completed(self, returncode):
self._parent_shutdown()
try:
@@ -431,16 +372,16 @@ class Job:
status = JobStatus.FAIL
elif returncode == _ReturnCode.TERMINATED:
if self._terminated:
- self.message(MessageType.INFO, "Process was terminated")
+ self.message(MessageType.INFO, "Job terminated")
else:
- self.message(MessageType.ERROR, "Process was terminated unexpectedly")
+ self.message(MessageType.ERROR, "Job was terminated unexpectedly")
status = JobStatus.FAIL
elif returncode == _ReturnCode.KILLED:
if self._terminated:
- self.message(MessageType.INFO, "Process was killed")
+ self.message(MessageType.INFO, "Job was killed")
else:
- self.message(MessageType.ERROR, "Process was killed unexpectedly")
+ self.message(MessageType.ERROR, "Job was killed unexpectedly")
status = JobStatus.FAIL
else:
@@ -451,7 +392,7 @@ class Job:
# Force the deletion of the pipe and process objects to try and clean up FDs
self._pipe_r.close()
- self._pipe_r = self._process = None
+ self._pipe_r = self._task = None
# _parent_process_envelope()
#
@@ -655,19 +596,6 @@ class ChildJob:
# pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
#
def child_action(self, pipe_w):
-
- # This avoids some SIGTSTP signals from grandchildren
- # getting propagated up to the master process
- os.setsid()
-
- # First set back to the default signal handlers for the signals
- # we handle, and then clear their blocked state.
- #
- signal_list = [signal.SIGTSTP, signal.SIGTERM]
- for sig in signal_list:
- signal.signal(sig, signal.SIG_DFL)
- signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
-
# Assign the pipe we passed across the process boundaries
#
# Set the global message handler in this child
@@ -687,16 +615,11 @@ class ChildJob:
nonlocal starttime
starttime += datetime.datetime.now() - stopped_time
- # Graciously handle sigterms.
- def handle_sigterm():
- self._child_shutdown(_ReturnCode.TERMINATED)
-
# Time, log and and run the action function
#
- with _signals.terminator(handle_sigterm), _signals.suspendable(
- stop_time, resume_time
- ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
-
+ with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+ self._logfile, self._logdir
+ ) as filename:
self.message(MessageType.START, self.action_name, logfile=filename)
try:
@@ -707,7 +630,7 @@ class ChildJob:
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
- self._child_shutdown(_ReturnCode.SKIPPED)
+ return _ReturnCode.SKIPPED
except BstError as e:
elapsed = datetime.datetime.now() - starttime
retry_flag = e.temporary
@@ -731,7 +654,7 @@ class ChildJob:
# Set return code based on whether or not the error was temporary.
#
- self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
+ return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
except Exception: # pylint: disable=broad-except
@@ -744,7 +667,7 @@ class ChildJob:
self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
# Unhandled exceptions should permenantly fail
- self._child_shutdown(_ReturnCode.PERM_FAIL)
+ return _ReturnCode.PERM_FAIL
else:
# No exception occurred in the action
@@ -757,7 +680,9 @@ class ChildJob:
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
# is already busy in sys.exit()
- self._child_shutdown(_ReturnCode.OK)
+ return _ReturnCode.OK
+ finally:
+ self._pipe_w.close()
#######################################################
# Local Private Methods #
@@ -809,18 +734,6 @@ class ChildJob:
if result is not None:
self._send_message(_MessageType.RESULT, result)
- # _child_shutdown()
- #
- # Shuts down the child process by cleaning up and exiting the process
- #
- # Args:
- # exit_code (_ReturnCode): The exit code to exit with
- #
- def _child_shutdown(self, exit_code):
- self._pipe_w.close()
- assert isinstance(exit_code, _ReturnCode)
- sys.exit(exit_code.value)
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3e6bf1f92..7380f07e8 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,14 @@
# Jürg Billeter <juerg.billeter@codethink.co.uk>
# System imports
+import functools
import os
import asyncio
from itertools import chain
import signal
import datetime
import sys
+from concurrent.futures import ThreadPoolExecutor
# Local imports
from .resources import Resources
@@ -34,9 +36,7 @@ from ..types import FastEnum
from .._profile import Topics, PROFILER
from .._message import Message, MessageType
from ..plugin import Plugin
-
-
-_MAX_TIMEOUT_TO_KILL_CHILDREN = 20 # in seconds
+from .. import _signals
# A decent return code for Scheduler.run()
@@ -46,6 +46,23 @@ class SchedStatus(FastEnum):
TERMINATED = 1
+def reset_signals_on_exit(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ orig_sigint = signal.getsignal(signal.SIGINT)
+ orig_sigterm = signal.getsignal(signal.SIGTERM)
+ orig_sigtstp = signal.getsignal(signal.SIGTSTP)
+
+ try:
+ return func(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGINT, orig_sigint)
+ signal.signal(signal.SIGTERM, orig_sigterm)
+ signal.signal(signal.SIGTSTP, orig_sigtstp)
+
+ return wrapper
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -79,7 +96,6 @@ class Scheduler:
# These are shared with the Job, but should probably be removed or made private in some way.
self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -113,6 +129,7 @@ class Scheduler:
# elements have been processed by each queue or when
# an error arises
#
+ @reset_signals_on_exit
def run(self, queues, casd_process_manager):
# Hold on to the queues to process
@@ -149,10 +166,14 @@ class Scheduler:
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
- # Run the queues
- self._sched()
- self.loop.run_forever()
- self.loop.close()
+ # FIXME: this should be done in a cleaner way
+ with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None):
+ with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool:
+ self.loop.set_default_executor(pool)
+ # Run the queues
+ self._sched()
+ self.loop.run_forever()
+ self.loop.close()
# Stop watching casd
_watcher.remove_child_handler(self._casd_process.pid)
@@ -348,13 +369,6 @@ class Scheduler:
# If that happens, do another round.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # Make sure fork is allowed before starting jobs
- if not self.context.prepare_fork():
- message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self.context.messenger.message(message)
- self.terminate()
- return
-
# Start the jobs
#
for job in ready:
@@ -412,9 +426,9 @@ class Scheduler:
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
- # Notify that we're suspended
- for job in self._active_jobs:
- job.suspend()
+
+ for suspender in reversed(_signals.suspendable_stack):
+ suspender.suspend()
# _resume_jobs()
#
@@ -422,8 +436,9 @@ class Scheduler:
#
def _resume_jobs(self):
if self.suspended:
- for job in self._active_jobs:
- job.resume()
+ for suspender in _signals.suspendable_stack:
+ suspender.resume()
+
self.suspended = False
# Notify that we're unsuspended
self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
@@ -455,12 +470,6 @@ class Scheduler:
# A loop registered event callback for SIGTSTP
#
def _suspend_event(self):
-
- # Ignore the feedback signals from Job.suspend()
- if self.internal_stops:
- self.internal_stops -= 1
- return
-
# No need to care if jobs were suspended or not, we _only_ handle this
# while we know jobs are not suspended.
self._suspend_jobs()
@@ -482,12 +491,8 @@ class Scheduler:
self.loop.remove_signal_handler(signal.SIGTERM)
def _terminate_jobs_real(self):
- def kill_jobs():
- for job_ in self._active_jobs:
- job_.kill()
-
- # Schedule all jobs to be killed if they have not exited after timeout
- self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs)
+ for terminator in _signals.terminator_stack.copy():
+ terminator()
for job in self._active_jobs:
job.terminate()
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index 7c2da1c02..b9ca91945 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -259,6 +259,11 @@ class DownloadableFileSource(Source):
return self.__default_mirror_file
+ @classmethod
+ def _reset_url_opener(cls):
+ # Needed for tests, in order to cleanup the `netrc` configuration.
+ cls.__urlopener = None
+
def __get_urlopener(self):
if not DownloadableFileSource.__urlopener:
try:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index f15d5a628..84315904f 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -990,19 +990,11 @@ class Source(Plugin):
clean = node.strip_node_info()
to_modify = node.strip_node_info()
- current_ref = self.get_ref() # pylint: disable=assignment-from-no-return
-
# Set the ref regardless of whether it changed, the
# TrackQueue() will want to update a specific node with
# the ref, regardless of whether the original has changed.
self.set_ref(new_ref, to_modify)
- if current_ref == new_ref or not save:
- # Note: We do not look for and propagate changes at this point
- # which might result in desync depending if something changes about
- # tracking in the future. For now, this is quite safe.
- return False
-
actions = {}
for k, v in clean.items():
if k not in to_modify:
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 5d1c1d227..520f68587 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -42,3 +42,4 @@ def thread_check(default_thread_number):
@pytest.fixture(autouse=True)
def reset_global_node_state():
node._reset_global_state()
+ DownloadableFileSource._reset_url_opener()
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index 043531c24..e27e40974 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -3,6 +3,7 @@ import time
from unittest.mock import MagicMock
from buildstream._cas.cascache import CASCache
+from buildstream._cas import casdprocessmanager
from buildstream._message import MessageType
from buildstream._messenger import Messenger
@@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))
@@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone")
dummy_buildbox_casd.chmod(0o777)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+ # FIXME: this is a hack, we should instead have a socket be created nicely
+ # on the fake casd script. This whole test suite probably would
+ # need some cleanup
+ monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
messenger = MagicMock(spec_set=Messenger)
cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))