diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-10-02 11:30:10 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-11-25 14:28:15 +0000 |
commit | a4bafc125655b73a81cc2e0421f02e508b58660e (patch) | |
tree | 0ace38e20582d40a5a6af8ed42044df919382117 | |
parent | bf1741582db3316e00d980e8b012b5b35d64d635 (diff) | |
download | buildstream-a4bafc125655b73a81cc2e0421f02e508b58660e.tar.gz |
Move sched notification poll to loop reader
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 3476162e3..87853f050 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -203,15 +203,16 @@ class Scheduler: _watcher = asyncio.get_child_watcher() _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) - # Add notification handler - if self._notify_back_queue: - self.loop.call_later(0.01, self._loop) + # Add notification listener if in subprocess + self._start_listening() # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues self._sched() self.loop.run_forever() + # Stop listening for notifications + self._stop_listening() self.loop.close() # Stop watching casd @@ -387,7 +388,7 @@ class Scheduler: # def _abort_on_casd_failure(self, pid, returncode): message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self._notify_front(Notification(NotificationType.MESSAGE, message=message)) self._casd_process.returncode = returncode self.terminate_jobs() @@ -621,16 +622,17 @@ class Scheduler: raise ValueError("Unrecognised notification type received") def _loop(self): - assert self._notify_back_queue - # Check for and process new messages - while True: - try: - notification = self._notify_back_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - notification = None - break - self.loop.call_later(0.01, self._loop) + while not self._notify_back_queue.empty(): + notification = self._notify_back_queue.get_nowait() + self._notification_handler(notification) + + def _start_listening(self): + if self._notify_back_queue: + self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop) + + def _stop_listening(self): + if self._notify_back_queue: + self.loop.remove_reader(self._notify_back_queue._reader.fileno()) def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing |