diff options
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 18 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 16 |
2 files changed, 7 insertions, 27 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 0c94fd376..a2f1c241f 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -54,8 +54,6 @@ class SchedStatus(FastEnum): # to be used as a conditional for control or state handling. # class NotificationType(FastEnum): - INTERRUPT = "interrupt" - TICK = "tick" TERMINATE = "terminate" QUIT = "quit" SUSPEND = "suspend" @@ -112,7 +110,7 @@ class Notification: # ticker_callback: A callback call once per second # class Scheduler: - def __init__(self, context, start_time, state, notification_queue, notifier): + def __init__(self, context, start_time, state, notification_queue, interrupt_callback, ticker_callback): # # Public members @@ -137,9 +135,11 @@ class Scheduler: self._sched_handle = None # Whether a scheduling job is already scheduled or not + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback + # Bidirectional queue to send notifications back to the Scheduler's owner self._notification_queue = notification_queue - self._notifier = notifier self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) self._state.register_task_retry_callback(self._failure_retry) @@ -487,8 +487,7 @@ class Scheduler: if self.terminated: return - notification = Notification(NotificationType.INTERRUPT) - self._notify(notification) + self._interrupt_callback() # _terminate_event(): # @@ -541,7 +540,7 @@ class Scheduler: # Regular timeout for driving status in the UI def _tick(self): - self._notify(Notification(NotificationType.TICK)) + self._ticker_callback() self.loop.call_later(1, self._tick) def _failure_retry(self, action_name, unique_id): @@ -556,11 +555,6 @@ class Scheduler: queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) - def _notify(self, notification): - # Scheduler to Stream notifcations on right side - self._notification_queue.append(notification) - self._notifier() - def _stream_notification_handler(self): notification = self._notification_queue.popleft() if notification.notification_type == NotificationType.TERMINATE: diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 7c8baf233..b8f42f582 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -94,11 +94,9 @@ class Stream: context.messenger.set_state(self._state) self._scheduler = Scheduler( - context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler + context, session_start, self._state, self._notification_queue, interrupt_callback, ticker_callback ) self._session_start_callback = session_start_callback - self._ticker_callback = ticker_callback - self._interrupt_callback = interrupt_callback self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler self._running = False self._terminated = False @@ -1647,18 +1645,6 @@ class Stream: return element_targets, artifact_refs - def _scheduler_notification_handler(self): - # Check the queue is there - assert self._notification_queue - notification = self._notification_queue.pop() - - if notification.notification_type == NotificationType.INTERRUPT: - self._interrupt_callback() - elif notification.notification_type == NotificationType.TICK: - self._ticker_callback() - else: - raise StreamError("Unrecognised notification type received: {}".format(notification.notification_type)) - def _notify(self, notification): # Stream to scheduler notifcations on left side self._notification_queue.appendleft(notification) |