diff options
author | Angelos Evripiotis <jevripiotis@bloomberg.net> | 2019-05-07 17:02:18 +0100 |
---|---|---|
committer | Angelos Evripiotis <angelos.evripiotis@gmail.com> | 2019-05-22 09:32:20 +0000 |
commit | 5da43b3151ca51050d29c213fa536e839cd58af6 (patch) | |
tree | a6d45e3f9cfef0719ffac3b0eac17b41e7866604 | |
parent | d43ba46b11f1356d6014b3aa8593e896a45aed58 (diff) | |
download | buildstream-5da43b3151ca51050d29c213fa536e839cd58af6.tar.gz |
_scheduler/jobs/job: mv _parent* above _child*
Move the parent-specific methods above the child-specific methods.
This makes slightly more sense chronologically, as the parent creates
the child. It will also make diffs cleaner when splitting parent and
child into separate classes.
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 262 |
1 files changed, 131 insertions, 131 deletions
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index dd91d1634..1c19d0221 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -370,6 +370,137 @@ class Job(): # ####################################################### + # _parent_shutdown() + # + # Shuts down the Job on the parent side by reading any remaining + # messages on the message queue and cleaning up any resources. + # + def _parent_shutdown(self): + # Make sure we've read everything we need and then stop listening + self._parent_process_queue() + self._parent_stop_listening() + + # _parent_child_completed() + # + # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() + # + # Args: + # pid (int): The PID of the child which completed + # returncode (int): The return code of the child process + # + def _parent_child_completed(self, pid, returncode): + self._parent_shutdown() + + # We don't want to retry if we got OK or a permanent fail. + retry_flag = returncode == RC_FAIL + + if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: + self.spawn() + return + + # Resolve the outward facing overall job completion status + # + if returncode == RC_OK: + status = JobStatus.OK + elif returncode == RC_SKIPPED: + status = JobStatus.SKIPPED + elif returncode in (RC_FAIL, RC_PERM_FAIL): + status = JobStatus.FAIL + else: + status = JobStatus.FAIL + + self.parent_complete(status, self._result) + self._scheduler.job_completed(self, status) + + # Force the deletion of the queue and process objects to try and clean up FDs + self._queue = self._process = None + + # _parent_process_envelope() + # + # Processes a message Envelope deserialized form the message queue. + # + # this will have the side effect of assigning some local state + # on the Job in the parent process for later inspection when the + # child process completes. + # + # Args: + # envelope (Envelope): The message envelope + # + def _parent_process_envelope(self, envelope): + if not self._listening: + return + + if envelope.message_type == 'message': + # Propagate received messages from children + # back through the context. + self._scheduler.context.message(envelope.message) + elif envelope.message_type == 'error': + # For regression tests only, save the last error domain / reason + # reported from a child task in the main process, this global state + # is currently managed in _exceptions.py + set_last_task_error(envelope.message['domain'], + envelope.message['reason']) + elif envelope.message_type == 'result': + assert self._result is None + self._result = envelope.message + elif envelope.message_type == 'child_data': + # If we retry a job, we assign a new value to this + self.child_data = envelope.message + + # Try Job subclass specific messages now + elif not self.handle_message(envelope.message_type, + envelope.message): + assert 0, "Unhandled message type '{}': {}" \ + .format(envelope.message_type, envelope.message) + + # _parent_process_queue() + # + # Reads back message envelopes from the message queue + # in the parent process. + # + def _parent_process_queue(self): + while not self._queue.empty(): + envelope = self._queue.get_nowait() + self._parent_process_envelope(envelope) + + # _parent_recv() + # + # A callback to handle I/O events from the message + # queue file descriptor in the main process message loop + # + def _parent_recv(self, *args): + self._parent_process_queue() + + # _parent_start_listening() + # + # Starts listening on the message queue + # + def _parent_start_listening(self): + # Warning: Platform specific code up ahead + # + # The multiprocessing.Queue object does not tell us how + # to receive io events in the receiving process, so we + # need to sneak in and get its file descriptor. + # + # The _reader member of the Queue is currently private + # but well known, perhaps it will become public: + # + # http://bugs.python.org/issue3831 + # + if not self._listening: + self._scheduler.loop.add_reader( + self._queue._reader.fileno(), self._parent_recv) + self._listening = True + + # _parent_stop_listening() + # + # Stops listening on the message queue + # + def _parent_stop_listening(self): + if self._listening: + self._scheduler.loop.remove_reader(self._queue._reader.fileno()) + self._listening = False + # _child_action() # # Perform the action in the child process, this calls the action_cb. @@ -549,134 +680,3 @@ class Job(): return self._queue.put(_Envelope('message', message)) - - # _parent_shutdown() - # - # Shuts down the Job on the parent side by reading any remaining - # messages on the message queue and cleaning up any resources. - # - def _parent_shutdown(self): - # Make sure we've read everything we need and then stop listening - self._parent_process_queue() - self._parent_stop_listening() - - # _parent_child_completed() - # - # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler() - # - # Args: - # pid (int): The PID of the child which completed - # returncode (int): The return code of the child process - # - def _parent_child_completed(self, pid, returncode): - self._parent_shutdown() - - # We don't want to retry if we got OK or a permanent fail. - retry_flag = returncode == RC_FAIL - - if retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: - self.spawn() - return - - # Resolve the outward facing overall job completion status - # - if returncode == RC_OK: - status = JobStatus.OK - elif returncode == RC_SKIPPED: - status = JobStatus.SKIPPED - elif returncode in (RC_FAIL, RC_PERM_FAIL): - status = JobStatus.FAIL - else: - status = JobStatus.FAIL - - self.parent_complete(status, self._result) - self._scheduler.job_completed(self, status) - - # Force the deletion of the queue and process objects to try and clean up FDs - self._queue = self._process = None - - # _parent_process_envelope() - # - # Processes a message Envelope deserialized form the message queue. - # - # this will have the side effect of assigning some local state - # on the Job in the parent process for later inspection when the - # child process completes. - # - # Args: - # envelope (Envelope): The message envelope - # - def _parent_process_envelope(self, envelope): - if not self._listening: - return - - if envelope.message_type == 'message': - # Propagate received messages from children - # back through the context. - self._scheduler.context.message(envelope.message) - elif envelope.message_type == 'error': - # For regression tests only, save the last error domain / reason - # reported from a child task in the main process, this global state - # is currently managed in _exceptions.py - set_last_task_error(envelope.message['domain'], - envelope.message['reason']) - elif envelope.message_type == 'result': - assert self._result is None - self._result = envelope.message - elif envelope.message_type == 'child_data': - # If we retry a job, we assign a new value to this - self.child_data = envelope.message - - # Try Job subclass specific messages now - elif not self.handle_message(envelope.message_type, - envelope.message): - assert 0, "Unhandled message type '{}': {}" \ - .format(envelope.message_type, envelope.message) - - # _parent_process_queue() - # - # Reads back message envelopes from the message queue - # in the parent process. - # - def _parent_process_queue(self): - while not self._queue.empty(): - envelope = self._queue.get_nowait() - self._parent_process_envelope(envelope) - - # _parent_recv() - # - # A callback to handle I/O events from the message - # queue file descriptor in the main process message loop - # - def _parent_recv(self, *args): - self._parent_process_queue() - - # _parent_start_listening() - # - # Starts listening on the message queue - # - def _parent_start_listening(self): - # Warning: Platform specific code up ahead - # - # The multiprocessing.Queue object does not tell us how - # to receive io events in the receiving process, so we - # need to sneak in and get its file descriptor. - # - # The _reader member of the Queue is currently private - # but well known, perhaps it will become public: - # - # http://bugs.python.org/issue3831 - # - if not self._listening: - self._scheduler.loop.add_reader( - self._queue._reader.fileno(), self._parent_recv) - self._listening = True - - # _parent_stop_listening() - # - # Stops listening on the message queue - # - def _parent_stop_listening(self): - if self._listening: - self._scheduler.loop.remove_reader(self._queue._reader.fileno()) - self._listening = False |