summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildstream/_scheduler/scheduler.py18
-rw-r--r--src/buildstream/_stream.py16
2 files changed, 7 insertions, 27 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0c94fd376..a2f1c241f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -54,8 +54,6 @@ class SchedStatus(FastEnum):
# to be used as a conditional for control or state handling.
#
class NotificationType(FastEnum):
- INTERRUPT = "interrupt"
- TICK = "tick"
TERMINATE = "terminate"
QUIT = "quit"
SUSPEND = "suspend"
@@ -112,7 +110,7 @@ class Notification:
# ticker_callback: A callback call once per second
#
class Scheduler:
- def __init__(self, context, start_time, state, notification_queue, notifier):
+ def __init__(self, context, start_time, state, notification_queue, interrupt_callback, ticker_callback):
#
# Public members
@@ -137,9 +135,11 @@ class Scheduler:
self._sched_handle = None # Whether a scheduling job is already scheduled or not
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
+
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
- self._notifier = notifier
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
self._state.register_task_retry_callback(self._failure_retry)
@@ -487,8 +487,7 @@ class Scheduler:
if self.terminated:
return
- notification = Notification(NotificationType.INTERRUPT)
- self._notify(notification)
+ self._interrupt_callback()
# _terminate_event():
#
@@ -541,7 +540,7 @@ class Scheduler:
# Regular timeout for driving status in the UI
def _tick(self):
- self._notify(Notification(NotificationType.TICK))
+ self._ticker_callback()
self.loop.call_later(1, self._tick)
def _failure_retry(self, action_name, unique_id):
@@ -556,11 +555,6 @@ class Scheduler:
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
- def _notify(self, notification):
- # Scheduler to Stream notifcations on right side
- self._notification_queue.append(notification)
- self._notifier()
-
def _stream_notification_handler(self):
notification = self._notification_queue.popleft()
if notification.notification_type == NotificationType.TERMINATE:
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 7c8baf233..b8f42f582 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -94,11 +94,9 @@ class Stream:
context.messenger.set_state(self._state)
self._scheduler = Scheduler(
- context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
+ context, session_start, self._state, self._notification_queue, interrupt_callback, ticker_callback
)
self._session_start_callback = session_start_callback
- self._ticker_callback = ticker_callback
- self._interrupt_callback = interrupt_callback
self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
self._running = False
self._terminated = False
@@ -1647,18 +1645,6 @@ class Stream:
return element_targets, artifact_refs
- def _scheduler_notification_handler(self):
- # Check the queue is there
- assert self._notification_queue
- notification = self._notification_queue.pop()
-
- if notification.notification_type == NotificationType.INTERRUPT:
- self._interrupt_callback()
- elif notification.notification_type == NotificationType.TICK:
- self._ticker_callback()
- else:
- raise StreamError("Unrecognised notification type received: {}".format(notification.notification_type))
-
def _notify(self, notification):
# Stream to scheduler notifcations on left side
self._notification_queue.appendleft(notification)