summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-10-02 11:30:10 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-11-25 14:28:15 +0000
commita4bafc125655b73a81cc2e0421f02e508b58660e (patch)
tree0ace38e20582d40a5a6af8ed42044df919382117
parentbf1741582db3316e00d980e8b012b5b35d64d635 (diff)
downloadbuildstream-a4bafc125655b73a81cc2e0421f02e508b58660e.tar.gz
Move sched notification poll to loop reader
-rw-r--r--src/buildstream/_scheduler/scheduler.py30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3476162e3..87853f050 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -203,15 +203,16 @@ class Scheduler:
_watcher = asyncio.get_child_watcher()
_watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
- # Add notification handler
- if self._notify_back_queue:
- self.loop.call_later(0.01, self._loop)
+ # Add notification listener if in subprocess
+ self._start_listening()
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
self._sched()
self.loop.run_forever()
+ # Stop listening for notifications
+ self._stop_listening()
self.loop.close()
# Stop watching casd
@@ -387,7 +388,7 @@ class Scheduler:
#
def _abort_on_casd_failure(self, pid, returncode):
message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self._notify_front(Notification(NotificationType.MESSAGE, message=message))
self._casd_process.returncode = returncode
self.terminate_jobs()
@@ -621,16 +622,17 @@ class Scheduler:
raise ValueError("Unrecognised notification type received")
def _loop(self):
- assert self._notify_back_queue
- # Check for and process new messages
- while True:
- try:
- notification = self._notify_back_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- notification = None
- break
- self.loop.call_later(0.01, self._loop)
+ while not self._notify_back_queue.empty():
+ notification = self._notify_back_queue.get_nowait()
+ self._notification_handler(notification)
+
+ def _start_listening(self):
+ if self._notify_back_queue:
+ self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
+
+ def _stop_listening(self):
+ if self._notify_back_queue:
+ self.loop.remove_reader(self._notify_back_queue._reader.fileno())
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing