summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildstream/_scheduler/jobs/job.py69
1 files changed, 50 insertions, 19 deletions
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 348204750..b8b4a2c76 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,10 +58,10 @@ class JobStatus():
# Used to distinguish between status messages and return values
-class Envelope():
+class _Envelope():
def __init__(self, message_type, message):
- self._message_type = message_type
- self._message = message
+ self.message_type = message_type
+ self.message = message
# Process class that doesn't call waitpid on its own.
@@ -263,10 +263,37 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
+ # send_message()
+ #
+ # To be called from inside Job.child_process() implementations
+ # to send messages to the main process during processing.
+ #
+ # These messages will be processed by the class's Job.handle_message()
+ # implementation.
+ #
+ def send_message(self, message_type, message):
+ self._queue.put(_Envelope(message_type, message))
+
#######################################################
# Abstract Methods #
#######################################################
+ # handle_message()
+ #
+ # Handle a custom message. This will be called in the main process in
+ # response to any messages sent to the main proces using the
+ # Job.send_message() API from inside a Job.child_process() implementation
+ #
+ # Args:
+ # message_type (str): A string to identify the message type
+ # message (any): A simple serializable object
+ #
+ # Returns:
+ # (bool): Should return a truthy value if message_type is handled.
+ #
+ def handle_message(self, message_type, message):
+ return False
+
# parent_complete()
#
# This will be executed after the job finishes, and is expected to
@@ -404,7 +431,7 @@ class Job():
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -430,7 +457,7 @@ class Job():
else:
# No exception occurred in the action
- self._queue.put(Envelope('child_data', self.child_process_data()))
+ self._queue.put(_Envelope('child_data', self.child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
@@ -457,7 +484,7 @@ class Job():
domain = e.domain
reason = e.reason
- envelope = Envelope('error', {
+ envelope = _Envelope('error', {
'domain': domain,
'reason': reason
})
@@ -475,7 +502,7 @@ class Job():
#
def _child_send_result(self, result):
if result is not None:
- envelope = Envelope('result', result)
+ envelope = _Envelope('result', result)
self._queue.put(envelope)
# _child_shutdown()
@@ -512,7 +539,7 @@ class Job():
if message.message_type == MessageType.LOG:
return
- self._queue.put(Envelope('message', message))
+ self._queue.put(_Envelope('message', message))
# _parent_shutdown()
#
@@ -573,24 +600,28 @@ class Job():
if not self._listening:
return
- if envelope._message_type == 'message':
+ if envelope.message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope._message)
- elif envelope._message_type == 'error':
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope._message['domain'],
- envelope._message['reason'])
- elif envelope._message_type == 'result':
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
assert self._result is None
- self._result = envelope._message
- elif envelope._message_type == 'child_data':
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
# If we retry a job, we assign a new value to this
- self.child_data = envelope._message
- else:
- raise Exception()
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
# _parent_process_queue()
#