diff options
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 69 |
1 files changed, 50 insertions, 19 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 348204750..b8b4a2c76 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -58,10 +58,10 @@ class JobStatus(): # Used to distinguish between status messages and return values -class Envelope(): +class _Envelope(): def __init__(self, message_type, message): - self._message_type = message_type - self._message = message + self.message_type = message_type + self.message = message # Process class that doesn't call waitpid on its own. @@ -263,10 +263,37 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # send_message() + # + # To be called from inside Job.child_process() implementations + # to send messages to the main process during processing. + # + # These messages will be processed by the class's Job.handle_message() + # implementation. + # + def send_message(self, message_type, message): + self._queue.put(_Envelope(message_type, message)) + ####################################################### # Abstract Methods # ####################################################### + # handle_message() + # + # Handle a custom message. This will be called in the main process in + # response to any messages sent to the main proces using the + # Job.send_message() API from inside a Job.child_process() implementation + # + # Args: + # message_type (str): A string to identify the message type + # message (any): A simple serializable object + # + # Returns: + # (bool): Should return a truthy value if message_type is handled. + # + def handle_message(self, message_type, message): + return False + # parent_complete() # # This will be executed after the job finishes, and is expected to @@ -404,7 +431,7 @@ class Job(): elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox) - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) # Report the exception to the parent (for internal testing purposes) self._child_send_error(e) @@ -430,7 +457,7 @@ class Job(): else: # No exception occurred in the action - self._queue.put(Envelope('child_data', self.child_process_data())) + self._queue.put(_Envelope('child_data', self.child_process_data())) self._child_send_result(result) elapsed = datetime.datetime.now() - starttime @@ -457,7 +484,7 @@ class Job(): domain = e.domain reason = e.reason - envelope = Envelope('error', { + envelope = _Envelope('error', { 'domain': domain, 'reason': reason }) @@ -475,7 +502,7 @@ class Job(): # def _child_send_result(self, result): if result is not None: - envelope = Envelope('result', result) + envelope = _Envelope('result', result) self._queue.put(envelope) # _child_shutdown() @@ -512,7 +539,7 @@ class Job(): if message.message_type == MessageType.LOG: return - self._queue.put(Envelope('message', message)) + self._queue.put(_Envelope('message', message)) # _parent_shutdown() # @@ -573,24 +600,28 @@ class Job(): if not self._listening: return - if envelope._message_type == 'message': + if envelope.message_type == 'message': # Propagate received messages from children # back through the context. - self._scheduler.context.message(envelope._message) - elif envelope._message_type == 'error': + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope._message['domain'], - envelope._message['reason']) - elif envelope._message_type == 'result': + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': assert self._result is None - self._result = envelope._message - elif envelope._message_type == 'child_data': + self._result = envelope.message + elif envelope.message_type == 'child_data': # If we retry a job, we assign a new value to this - self.child_data = envelope._message - else: - raise Exception() + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) # _parent_process_queue() # |