summaryrefslogtreecommitdiff
path: root/buildstream/_scheduler/jobs/elementjob.py
blob: 68f4e0406e04348adf3e30049c46539054731730 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#  Copyright (C) 2018 Codethink Limited
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Author:
#        Tristan Daniƫl Maat <tristan.maat@codethink.co.uk>
#
import os
from contextlib import contextmanager

from ruamel import yaml

from ..._message import Message, MessageType
from ...plugin import _plugin_lookup
from ... import _signals

from .job import Job


# ElementJob()
#
# A job to run an element's commands. When this job is spawned
# `action_cb` will be called, and when it completes `complete_cb` will
# be called.
#
# Args:
#    scheduler (Scheduler): The scheduler
#    action_name (str): The queue action name
#    max_retries (int): The maximum number of retries
#    action_cb (callable): The function to execute on the child
#    complete_cb (callable): The function to execute when the job completes
#    element (Element): The element to work on
#    kwargs: Remaining Job() constructor arguments
#
# Here is the calling signature of the action_cb:
#
#     action_cb():
#
#     This function will be called in the child task
#
#     Args:
#        element (Element): The element passed to the Job() constructor
#
#     Returns:
#        (object): Any abstract simple python object, including a string, int,
#                  bool, list or dict, this must be a simple serializable object.
#
# Here is the calling signature of the complete_cb:
#
#     complete_cb():
#
#     This function will be called when the child task completes
#
#     Args:
#        job (Job): The job object which completed
#        element (Element): The element passed to the Job() constructor
#        success (bool): True if the action_cb did not raise an exception
#        result (object): The deserialized object returned by the `action_cb`, or None
#                         if `success` is False
#
class ElementJob(Job):
    def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = queue
        self._element = element
        self._action_cb = action_cb            # The action callable function
        self._complete_cb = complete_cb        # The complete callable function

    @property
    def element(self):
        return self._element

    # _child_process()
    #
    # This will be executed after fork(), and is intended to perform
    # the job's task.
    #
    # Returns:
    #    (any): A (simple!) object to be returned to the main thread
    #           as the result.
    #
    def _child_process(self):
        return self._action_cb(self._element)

    def _parent_complete(self, success, result):
        self._complete_cb(self, self._element, success, self._result)

    # _child_logging_enabled()
    #
    # Start the log for this job. This function will be given a
    # template string for the path to a log file - this will contain
    # "{pid}", which should be replaced with the current process'
    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
    #
    # Args:
    #    logfile (str): A template string that points to the logfile
    #                   that should be used - replace {pid} first.
    #
    # Yields:
    #    (str) The path to the logfile with {pid} replaced.
    #
    @contextmanager
    def _child_logging_enabled(self, logfile):
        self._logfile = logfile.format(pid=os.getpid())

        with open(self._logfile, 'a') as log:
            # Write one last line to the log and flush it to disk
            def flush_log():

                # If the process currently had something happening in the I/O stack
                # then trying to reenter the I/O stack will fire a runtime error.
                #
                # So just try to flush as well as we can at SIGTERM time
                try:
                    # FIXME: Better logging

                    log.write('\n\nAction {} for element {} forcefully terminated\n'
                              .format(self.action_name, self._element.name))
                    log.flush()
                except RuntimeError:
                    os.fsync(log.fileno())

            self._element._set_log_handle(log)
            with _signals.terminator(flush_log):
                self._print_start_message(self._element, self._logfile)
                yield self._logfile
            self._element._set_log_handle(None)
            self._logfile = None

    # _message():
    #
    # Sends a message to the frontend
    #
    # Args:
    #    message_type (MessageType): The type of message to send
    #    message (str): The message
    #    kwargs: Remaining Message() constructor arguments
    #
    def _message(self, message_type, message, **kwargs):
        args = dict(kwargs)
        args['scheduler'] = True
        self._scheduler.context.message(
            Message(self._element._get_unique_id(),
                    message_type,
                    message,
                    **args))

    def _print_start_message(self, element, logfile):
        self._message(MessageType.START, self.action_name, logfile=logfile)

        # Print the element's environment at the beginning of any element's log file.
        #
        # This should probably be omitted for non-build tasks but it's harmless here
        elt_env = element.get_environment()
        env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
        self._message(MessageType.LOG,
                      "Build environment for element {}".format(element.name),
                      detail=env_dump, logfile=logfile)

    # _child_log()
    #
    # Log a message returned by the frontend's main message handler
    # and return it to the main process.
    #
    # Arguments:
    #     message (str): The message to log
    #
    # Returns:
    #     message (Message): A message object
    #
    def _child_log(self, message):
        # Tag them on the way out the door...
        message.action_name = self.action_name
        message.task_id = self._element._get_unique_id()

        # Use the plugin for the task for the output, not a plugin
        # which might be acting on behalf of the task
        plugin = _plugin_lookup(message.task_id)

        with plugin._output_file() as output:
            message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
            output.write('{}\n'.format(message_text))
            output.flush()

        return message

    # _child_process_data()
    #
    # Abstract method to retrieve additional data that should be
    # returned to the parent process. Note that the job result is
    # retrieved independently.
    #
    # Values can later be retrieved in Job.child_data.
    #
    # Returns:
    #    (dict) A dict containing values later to be read by _process_sync_data
    #
    def _child_process_data(self):
        data = {}

        workspace = self._element._get_workspace()

        if workspace is not None:
            data['workspace'] = workspace.to_dict()

        return data