summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAngelos Evripiotis <jevripiotis@bloomberg.net>2019-05-07 17:02:18 +0100
committerAngelos Evripiotis <angelos.evripiotis@gmail.com>2019-05-22 09:32:20 +0000
commit5da43b3151ca51050d29c213fa536e839cd58af6 (patch)
treea6d45e3f9cfef0719ffac3b0eac17b41e7866604
parentd43ba46b11f1356d6014b3aa8593e896a45aed58 (diff)
downloadbuildstream-5da43b3151ca51050d29c213fa536e839cd58af6.tar.gz
_scheduler/jobs/job: mv _parent* above _child*
Move the parent-specific methods above the child-specific methods. This makes slightly more sense chronologically, as the parent creates the child. It will also make diffs cleaner when splitting parent and child into separate classes.
-rw-r--r--src/buildstream/_scheduler/jobs/job.py262
1 files changed, 131 insertions, 131 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index dd91d1634..1c19d0221 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -370,6 +370,137 @@ class Job():
#
#######################################################
+ # _parent_shutdown()
+ #
+ # Shuts down the Job on the parent side by reading any remaining
+ # messages on the message queue and cleaning up any resources.
+ #
+ def _parent_shutdown(self):
+ # Make sure we've read everything we need and then stop listening
+ self._parent_process_queue()
+ self._parent_stop_listening()
+
+ # _parent_child_completed()
+ #
+ # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
+ #
+ # Args:
+ # pid (int): The PID of the child which completed
+ # returncode (int): The return code of the child process
+ #
+ def _parent_child_completed(self, pid, returncode):
+ self._parent_shutdown()
+
+ # We don't want to retry if we got OK or a permanent fail.
+ retry_flag = returncode == RC_FAIL
+
+ if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
+ self.spawn()
+ return
+
+ # Resolve the outward facing overall job completion status
+ #
+ if returncode == RC_OK:
+ status = JobStatus.OK
+ elif returncode == RC_SKIPPED:
+ status = JobStatus.SKIPPED
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
+ status = JobStatus.FAIL
+ else:
+ status = JobStatus.FAIL
+
+ self.parent_complete(status, self._result)
+ self._scheduler.job_completed(self, status)
+
+ # Force the deletion of the queue and process objects to try and clean up FDs
+ self._queue = self._process = None
+
+ # _parent_process_envelope()
+ #
+ # Processes a message Envelope deserialized form the message queue.
+ #
+ # this will have the side effect of assigning some local state
+ # on the Job in the parent process for later inspection when the
+ # child process completes.
+ #
+ # Args:
+ # envelope (Envelope): The message envelope
+ #
+ def _parent_process_envelope(self, envelope):
+ if not self._listening:
+ return
+
+ if envelope.message_type == 'message':
+ # Propagate received messages from children
+ # back through the context.
+ self._scheduler.context.message(envelope.message)
+ elif envelope.message_type == 'error':
+ # For regression tests only, save the last error domain / reason
+ # reported from a child task in the main process, this global state
+ # is currently managed in _exceptions.py
+ set_last_task_error(envelope.message['domain'],
+ envelope.message['reason'])
+ elif envelope.message_type == 'result':
+ assert self._result is None
+ self._result = envelope.message
+ elif envelope.message_type == 'child_data':
+ # If we retry a job, we assign a new value to this
+ self.child_data = envelope.message
+
+ # Try Job subclass specific messages now
+ elif not self.handle_message(envelope.message_type,
+ envelope.message):
+ assert 0, "Unhandled message type '{}': {}" \
+ .format(envelope.message_type, envelope.message)
+
+ # _parent_process_queue()
+ #
+ # Reads back message envelopes from the message queue
+ # in the parent process.
+ #
+ def _parent_process_queue(self):
+ while not self._queue.empty():
+ envelope = self._queue.get_nowait()
+ self._parent_process_envelope(envelope)
+
+ # _parent_recv()
+ #
+ # A callback to handle I/O events from the message
+ # queue file descriptor in the main process message loop
+ #
+ def _parent_recv(self, *args):
+ self._parent_process_queue()
+
+ # _parent_start_listening()
+ #
+ # Starts listening on the message queue
+ #
+ def _parent_start_listening(self):
+ # Warning: Platform specific code up ahead
+ #
+ # The multiprocessing.Queue object does not tell us how
+ # to receive io events in the receiving process, so we
+ # need to sneak in and get its file descriptor.
+ #
+ # The _reader member of the Queue is currently private
+ # but well known, perhaps it will become public:
+ #
+ # http://bugs.python.org/issue3831
+ #
+ if not self._listening:
+ self._scheduler.loop.add_reader(
+ self._queue._reader.fileno(), self._parent_recv)
+ self._listening = True
+
+ # _parent_stop_listening()
+ #
+ # Stops listening on the message queue
+ #
+ def _parent_stop_listening(self):
+ if self._listening:
+ self._scheduler.loop.remove_reader(self._queue._reader.fileno())
+ self._listening = False
+
# _child_action()
#
# Perform the action in the child process, this calls the action_cb.
@@ -549,134 +680,3 @@ class Job():
return
self._queue.put(_Envelope('message', message))
-
- # _parent_shutdown()
- #
- # Shuts down the Job on the parent side by reading any remaining
- # messages on the message queue and cleaning up any resources.
- #
- def _parent_shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self._parent_process_queue()
- self._parent_stop_listening()
-
- # _parent_child_completed()
- #
- # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
- #
- # Args:
- # pid (int): The PID of the child which completed
- # returncode (int): The return code of the child process
- #
- def _parent_child_completed(self, pid, returncode):
- self._parent_shutdown()
-
- # We don't want to retry if we got OK or a permanent fail.
- retry_flag = returncode == RC_FAIL
-
- if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
- self.spawn()
- return
-
- # Resolve the outward facing overall job completion status
- #
- if returncode == RC_OK:
- status = JobStatus.OK
- elif returncode == RC_SKIPPED:
- status = JobStatus.SKIPPED
- elif returncode in (RC_FAIL, RC_PERM_FAIL):
- status = JobStatus.FAIL
- else:
- status = JobStatus.FAIL
-
- self.parent_complete(status, self._result)
- self._scheduler.job_completed(self, status)
-
- # Force the deletion of the queue and process objects to try and clean up FDs
- self._queue = self._process = None
-
- # _parent_process_envelope()
- #
- # Processes a message Envelope deserialized form the message queue.
- #
- # this will have the side effect of assigning some local state
- # on the Job in the parent process for later inspection when the
- # child process completes.
- #
- # Args:
- # envelope (Envelope): The message envelope
- #
- def _parent_process_envelope(self, envelope):
- if not self._listening:
- return
-
- if envelope.message_type == 'message':
- # Propagate received messages from children
- # back through the context.
- self._scheduler.context.message(envelope.message)
- elif envelope.message_type == 'error':
- # For regression tests only, save the last error domain / reason
- # reported from a child task in the main process, this global state
- # is currently managed in _exceptions.py
- set_last_task_error(envelope.message['domain'],
- envelope.message['reason'])
- elif envelope.message_type == 'result':
- assert self._result is None
- self._result = envelope.message
- elif envelope.message_type == 'child_data':
- # If we retry a job, we assign a new value to this
- self.child_data = envelope.message
-
- # Try Job subclass specific messages now
- elif not self.handle_message(envelope.message_type,
- envelope.message):
- assert 0, "Unhandled message type '{}': {}" \
- .format(envelope.message_type, envelope.message)
-
- # _parent_process_queue()
- #
- # Reads back message envelopes from the message queue
- # in the parent process.
- #
- def _parent_process_queue(self):
- while not self._queue.empty():
- envelope = self._queue.get_nowait()
- self._parent_process_envelope(envelope)
-
- # _parent_recv()
- #
- # A callback to handle I/O events from the message
- # queue file descriptor in the main process message loop
- #
- def _parent_recv(self, *args):
- self._parent_process_queue()
-
- # _parent_start_listening()
- #
- # Starts listening on the message queue
- #
- def _parent_start_listening(self):
- # Warning: Platform specific code up ahead
- #
- # The multiprocessing.Queue object does not tell us how
- # to receive io events in the receiving process, so we
- # need to sneak in and get its file descriptor.
- #
- # The _reader member of the Queue is currently private
- # but well known, perhaps it will become public:
- #
- # http://bugs.python.org/issue3831
- #
- if not self._listening:
- self._scheduler.loop.add_reader(
- self._queue._reader.fileno(), self._parent_recv)
- self._listening = True
-
- # _parent_stop_listening()
- #
- # Stops listening on the message queue
- #
- def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._queue._reader.fileno())
- self._listening = False