summaryrefslogtreecommitdiff
path: root/src/buildstream/_signals.py
blob: ef2ba1965062a18f99b90fed4df6d04ebba221e0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#
#  Copyright (C) 2017 Codethink Limited
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
import os
import signal
import sys
import threading
import traceback
from contextlib import contextmanager, ExitStack
from collections import deque
from typing import Callable, Deque


# Global per process state for handling of sigterm/sigtstp/sigcont,
# note that it is expected that this only ever be used by new processes
# the scheduler starts, not the main process.
#
terminator_stack: Deque[Callable] = deque()
suspendable_stack: Deque[Callable] = deque()

terminator_lock = threading.Lock()
suspendable_lock = threading.Lock()

# This event is used to block all the threads while we wait for user
# interaction. This is because we can't stop all the pythin threads but the
# one easily when waiting for user input. However, most performance intensive
# tasks will pass through a subprocess or a multiprocess.Process and all of
# those are guarded by the signal handling. Thus, by setting and unsetting this
# event in the scheduler, we can enable and disable the launching of processes
# and ensure we don't do anything resource intensive while being interrupted.
is_not_suspended = threading.Event()
is_not_suspended.set()


class TerminateException(BaseException):
    pass


# Per process SIGTERM handler
def terminator_handler(signal_, frame):
    exit_code = -1

    while terminator_stack:
        terminator_ = terminator_stack.pop()
        try:
            terminator_()
        except SystemExit as e:
            exit_code = e.code or 0
        except:  # noqa pylint: disable=bare-except
            # Ensure we print something if there's an exception raised when
            # processing the handlers. Note that the default exception
            # handler won't be called because we os._exit next, so we must
            # catch all possible exceptions with the unqualified 'except'
            # clause.
            traceback.print_exc(file=sys.stderr)
            print(
                "Error encountered in BuildStream while processing custom SIGTERM handler:",
                terminator_,
                file=sys.stderr,
            )

    # Use special exit here, terminate immediately, recommended
    # for precisely this situation where child processes are teminated.
    os._exit(exit_code)


# terminator()
#
# A context manager for interruptable tasks, this guarantees
# that while the code block is running, the supplied function
# will be called upon process termination.
#
# /!\ The callbacks passed must only contain code that does not acces thread
#     local variables. Those will run in the main thread.
#
# Note that after handlers are called, the termination will be handled by
# terminating immediately with os._exit(). This means that SystemExit will not
# be raised and 'finally' clauses will not be executed.
#
# Args:
#    terminate_func (callable): A function to call when aborting
#                               the nested code block.
#
@contextmanager
def terminator(terminate_func):
    global terminator_stack  # pylint: disable=global-statement

    outermost = bool(not terminator_stack)

    assert threading.current_thread() == threading.main_thread() or not outermost

    with terminator_lock:
        terminator_stack.append(terminate_func)

    if outermost:
        original_handler = signal.signal(signal.SIGTERM, terminator_handler)

    try:
        yield
    except TerminateException:
        terminate_func()
        raise
    finally:
        if outermost:
            signal.signal(signal.SIGTERM, original_handler)

        with terminator_lock:
            terminator_stack.remove(terminate_func)


# Just a simple object for holding on to two callbacks
class Suspender:
    def __init__(self, suspend_callback, resume_callback):
        self.suspend = suspend_callback
        self.resume = resume_callback


# Per process SIGTSTP handler
def suspend_handler(sig, frame):
    is_not_suspended.clear()

    # Suspend callbacks from innermost frame first
    with suspendable_lock:
        for suspender in reversed(suspendable_stack):
            suspender.suspend()

        # Use SIGSTOP directly now on self, dont introduce more SIGTSTP
        #
        # Here the process sleeps until SIGCONT, which we simply
        # dont handle. We know we'll pickup execution right here
        # when we wake up.
        os.kill(os.getpid(), signal.SIGSTOP)

        # Resume callbacks from outermost frame inwards
        for suspender in suspendable_stack:
            suspender.resume()

    is_not_suspended.set()


# suspendable()
#
# A context manager for handling process suspending and resumeing
#
# Args:
#    suspend_callback (callable): A function to call as process suspend time.
#    resume_callback (callable): A function to call as process resume time.
#
# /!\ The callbacks passed must only contain code that does not acces thread
#     local variables. Those will run in the main thread.
#
# This must be used in code blocks which start processes that become
# their own session leader. In these cases, SIGSTOP and SIGCONT need
# to be propagated to the child process group.
#
# This context manager can also be used recursively, so multiple
# things can happen at suspend/resume time (such as tracking timers
# and ensuring durations do not count suspended time).
#
@contextmanager
def suspendable(suspend_callback, resume_callback):
    global suspendable_stack  # pylint: disable=global-statement

    outermost = bool(not suspendable_stack)
    assert threading.current_thread() == threading.main_thread() or not outermost

    # If we are not in the main thread, ensure that we are not suspended
    # before running.
    # If we are in the main thread, never block on this, to ensure we
    # don't deadlock.
    if threading.current_thread() != threading.main_thread():
        is_not_suspended.wait()

    suspender = Suspender(suspend_callback, resume_callback)

    with suspendable_lock:
        suspendable_stack.append(suspender)

    if outermost:
        original_stop = signal.signal(signal.SIGTSTP, suspend_handler)

    try:
        yield
    finally:
        if outermost:
            signal.signal(signal.SIGTSTP, original_stop)

        with suspendable_lock:
            suspendable_stack.remove(suspender)


# blocked()
#
# A context manager for running a code block with blocked signals
#
# Args:
#    signals (list): A list of unix signals to block
#    ignore (bool): Whether to ignore entirely the signals which were
#                   received and pending while the process had blocked them
#
@contextmanager
def blocked(signal_list, ignore=True):

    with ExitStack() as stack:

        # Optionally add the ignored() context manager to this context
        if ignore:
            stack.enter_context(ignored(signal_list))

        # Set and save the sigprocmask
        blocked_signals = signal.pthread_sigmask(signal.SIG_BLOCK, signal_list)

        try:
            yield
        finally:
            # If we have discarded the signals completely, this line will cause
            # the discard_handler() to trigger for each signal in the list
            signal.pthread_sigmask(signal.SIG_SETMASK, blocked_signals)


# ignored()
#
# A context manager for running a code block with ignored signals
#
# Args:
#    signals (list): A list of unix signals to ignore
#
@contextmanager
def ignored(signal_list):

    orig_handlers = {}
    for sig in signal_list:
        orig_handlers[sig] = signal.signal(sig, signal.SIG_IGN)

    try:
        yield
    finally:
        for sig in signal_list:
            signal.signal(sig, orig_handlers[sig])