diff options
author | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-05 17:08:27 +0000 |
---|---|---|
committer | Benjamin Schubert <bschubert15@bloomberg.net> | 2020-12-05 17:08:27 +0000 |
commit | 3f07e6056076736ea63f5ed5b96f240978691da1 (patch) | |
tree | f047c9f5d81c78534832f479eb51f7191f48cd55 | |
parent | ded144e97de4c6bb4c211204761b0104e437df29 (diff) | |
download | buildstream-bschubert/optimize-job.tar.gz |
job.py: Simplify handling of messages through the parent-child pipebschubert/optimize-job
Now that the only type of message that goes through are messages for the
messenger, we can remove the enveloppe and only ever handle messenger's
messages
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 53 |
1 files changed, 4 insertions, 49 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 322849a79..aa71b6e18 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -63,17 +63,6 @@ class JobStatus(FastEnum): SKIPPED = 3 -# Used to distinguish between status messages and return values -class _Envelope: - def __init__(self, message_type, message): - self.message_type = message_type - self.message = message - - -class _MessageType(FastEnum): - LOG_MESSAGE = 1 - - # Job() # # The Job object represents a task that will run in parallel to the main @@ -363,28 +352,6 @@ class Job: self._pipe_r.close() self._pipe_r = self._task = None - # _parent_process_envelope() - # - # Processes a message Envelope deserialized form the message pipe. - # - # this will have the side effect of assigning some local state - # on the Job in the parent process for later inspection when the - # child process completes. - # - # Args: - # envelope (Envelope): The message envelope - # - def _parent_process_envelope(self, envelope): - if not self._listening: - return - - if envelope.message_type is _MessageType.LOG_MESSAGE: - # Propagate received messages from children - # back through the context. - self._messenger.message(envelope.message) - else: - assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message) - # _parent_process_pipe() # # Reads back message envelopes from the message pipe @@ -393,11 +360,12 @@ class Job: def _parent_process_pipe(self): while self._pipe_r.poll(): try: - envelope = self._pipe_r.recv() + message = self._pipe_r.recv() except EOFError: self._parent_stop_listening() break - self._parent_process_envelope(envelope) + + self._messenger.message(message) # _parent_recv() # @@ -629,19 +597,6 @@ class ChildJob: # Local Private Methods # ####################################################### - # _send_message() - # - # Send data in a message to the parent Job, running in the main process. - # - # Args: - # message_type (str): The type of message to send. - # message_data (any): A simple object (must be pickle-able, i.e. - # strings, lists, dicts, numbers, but not Element - # instances). This is sent to the parent Job. - # - def _send_message(self, message_type, message_data): - self._pipe_w.send(_Envelope(message_type, message_data)) - # _child_message_handler() # # A Context delegate for handling messages, this replaces the @@ -669,4 +624,4 @@ class ChildJob: if message.message_type == MessageType.LOG: return - self._send_message(_MessageType.LOG_MESSAGE, message) + self._pipe_w.send(message) |