summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 17:08:27 +0000
committerBenjamin Schubert <bschubert15@bloomberg.net>2020-12-05 17:08:27 +0000
commit3f07e6056076736ea63f5ed5b96f240978691da1 (patch)
treef047c9f5d81c78534832f479eb51f7191f48cd55
parentded144e97de4c6bb4c211204761b0104e437df29 (diff)
downloadbuildstream-bschubert/optimize-job.tar.gz
job.py: Simplify handling of messages through the parent-child pipebschubert/optimize-job
Now that the only type of message that goes through are messages for the messenger, we can remove the enveloppe and only ever handle messenger's messages
-rw-r--r--src/buildstream/_scheduler/jobs/job.py53
1 files changed, 4 insertions, 49 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 322849a79..aa71b6e18 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -63,17 +63,6 @@ class JobStatus(FastEnum):
SKIPPED = 3
-# Used to distinguish between status messages and return values
-class _Envelope:
- def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
-
-
-class _MessageType(FastEnum):
- LOG_MESSAGE = 1
-
-
# Job()
#
# The Job object represents a task that will run in parallel to the main
@@ -363,28 +352,6 @@ class Job:
self._pipe_r.close()
self._pipe_r = self._task = None
- # _parent_process_envelope()
- #
- # Processes a message Envelope deserialized form the message pipe.
- #
- # this will have the side effect of assigning some local state
- # on the Job in the parent process for later inspection when the
- # child process completes.
- #
- # Args:
- # envelope (Envelope): The message envelope
- #
- def _parent_process_envelope(self, envelope):
- if not self._listening:
- return
-
- if envelope.message_type is _MessageType.LOG_MESSAGE:
- # Propagate received messages from children
- # back through the context.
- self._messenger.message(envelope.message)
- else:
- assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
# _parent_process_pipe()
#
# Reads back message envelopes from the message pipe
@@ -393,11 +360,12 @@ class Job:
def _parent_process_pipe(self):
while self._pipe_r.poll():
try:
- envelope = self._pipe_r.recv()
+ message = self._pipe_r.recv()
except EOFError:
self._parent_stop_listening()
break
- self._parent_process_envelope(envelope)
+
+ self._messenger.message(message)
# _parent_recv()
#
@@ -629,19 +597,6 @@ class ChildJob:
# Local Private Methods #
#######################################################
- # _send_message()
- #
- # Send data in a message to the parent Job, running in the main process.
- #
- # Args:
- # message_type (str): The type of message to send.
- # message_data (any): A simple object (must be pickle-able, i.e.
- # strings, lists, dicts, numbers, but not Element
- # instances). This is sent to the parent Job.
- #
- def _send_message(self, message_type, message_data):
- self._pipe_w.send(_Envelope(message_type, message_data))
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
@@ -669,4 +624,4 @@ class ChildJob:
if message.message_type == MessageType.LOG:
return
- self._send_message(_MessageType.LOG_MESSAGE, message)
+ self._pipe_w.send(message)