summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-22 16:22:27 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:44:53 +0000
commit0c583026c8a3ae94e905f57d43beda88808ea990 (patch)
treea3bebd1a198bc4eaf5d80d87ef2e67aae7443d06
parentbe757fab5e8dc3d603e8f61e6ffbbf00053467a6 (diff)
downloadbuildstream-0c583026c8a3ae94e905f57d43beda88808ea990.tar.gz
scheduler.py: Notification for sched loop running status
Add a notification for RUNNING, which is stored as a bool in Stream which is flipped when the scheduler notifies that the event loop has changed state.
-rw-r--r--src/buildstream/_scheduler/scheduler.py7
-rw-r--r--src/buildstream/_stream.py9
2 files changed, 13 insertions, 3 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 7ac0559b7..816da40e0 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -55,6 +55,7 @@ class NotificationType(FastEnum):
TERMINATE = "terminate"
QUIT = "quit"
SCHED_START_TIME = "sched_start_time"
+ RUNNING = "running"
# Notification()
@@ -162,6 +163,9 @@ class Scheduler():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
+ # Notify that the loop has been created
+ self._notify(Notification(NotificationType.RUNNING))
+
# Add timeouts
self.loop.call_later(1, self._tick)
@@ -181,6 +185,9 @@ class Scheduler():
failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
+ # Notify that the loop has been reset
+ self._notify(Notification(NotificationType.RUNNING))
+
if failed:
status = SchedStatus.ERROR
elif self.terminated:
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 8f104a226..e346109fe 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -91,6 +91,7 @@ class Stream():
self._ticker_callback = ticker_callback
self._interrupt_callback = interrupt_callback
self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
+ self._scheduler_running = False
# init()
#
@@ -1077,7 +1078,7 @@ class Stream():
#
@property
def running(self):
- return self._scheduler.loop is not None
+ return self._scheduler_running
# suspended
#
@@ -1650,8 +1651,8 @@ class Stream():
return element_targets, artifact_refs
def _scheduler_notification_handler(self):
- # Check the queue is there and a scheduler is running
- assert self._notification_queue and self.running
+ # Check the queue is there
+ assert self._notification_queue
notification = self._notification_queue.pop()
if notification.notification_type == NotificationType.INTERRUPT:
@@ -1667,6 +1668,8 @@ class Stream():
notification.element)
elif notification.notification_type == NotificationType.SCHED_START_TIME:
self._starttime = notification.time
+ elif notification.notification_type == NotificationType.RUNNING:
+ self._scheduler_running = not self._scheduler_running
else:
raise StreamError("Unrecognised notification type received")