summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-04 18:05:21 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-07-06 18:07:53 +0000
commit61dd3d603d66bd8e92e87060e1d46993c71badf3 (patch)
treeed64d285b589de8f39b879595f5c82b8b377f2fd
parent598345c76510bc703e3f488a5fd8c0b1170509e3 (diff)
downloadbuildstream-61dd3d603d66bd8e92e87060e1d46993c71badf3.tar.gz
scheduler.py: Remove notifications from scheduler to stream
This removes all notifications left coming from the scheduler, and replaces them by callbacks
-rw-r--r--src/buildstream/_scheduler/scheduler.py18
-rw-r--r--src/buildstream/_stream.py16
2 files changed, 7 insertions, 27 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0c94fd376..a2f1c241f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -54,8 +54,6 @@ class SchedStatus(FastEnum):
# to be used as a conditional for control or state handling.
#
class NotificationType(FastEnum):
- INTERRUPT = "interrupt"
- TICK = "tick"
TERMINATE = "terminate"
QUIT = "quit"
SUSPEND = "suspend"
@@ -112,7 +110,7 @@ class Notification:
# ticker_callback: A callback call once per second
#
class Scheduler:
- def __init__(self, context, start_time, state, notification_queue, notifier):
+ def __init__(self, context, start_time, state, notification_queue, interrupt_callback, ticker_callback):
#
# Public members
@@ -137,9 +135,11 @@ class Scheduler:
self._sched_handle = None # Whether a scheduling job is already scheduled or not
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
+
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
- self._notifier = notifier
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
self._state.register_task_retry_callback(self._failure_retry)
@@ -487,8 +487,7 @@ class Scheduler:
if self.terminated:
return
- notification = Notification(NotificationType.INTERRUPT)
- self._notify(notification)
+ self._interrupt_callback()
# _terminate_event():
#
@@ -541,7 +540,7 @@ class Scheduler:
# Regular timeout for driving status in the UI
def _tick(self):
- self._notify(Notification(NotificationType.TICK))
+ self._ticker_callback()
self.loop.call_later(1, self._tick)
def _failure_retry(self, action_name, unique_id):
@@ -556,11 +555,6 @@ class Scheduler:
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
- def _notify(self, notification):
- # Scheduler to Stream notifcations on right side
- self._notification_queue.append(notification)
- self._notifier()
-
def _stream_notification_handler(self):
notification = self._notification_queue.popleft()
if notification.notification_type == NotificationType.TERMINATE:
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 7c8baf233..b8f42f582 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -94,11 +94,9 @@ class Stream:
context.messenger.set_state(self._state)
self._scheduler = Scheduler(
- context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
+ context, session_start, self._state, self._notification_queue, interrupt_callback, ticker_callback
)
self._session_start_callback = session_start_callback
- self._ticker_callback = ticker_callback
- self._interrupt_callback = interrupt_callback
self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
self._running = False
self._terminated = False
@@ -1647,18 +1645,6 @@ class Stream:
return element_targets, artifact_refs
- def _scheduler_notification_handler(self):
- # Check the queue is there
- assert self._notification_queue
- notification = self._notification_queue.pop()
-
- if notification.notification_type == NotificationType.INTERRUPT:
- self._interrupt_callback()
- elif notification.notification_type == NotificationType.TICK:
- self._ticker_callback()
- else:
- raise StreamError("Unrecognised notification type received: {}".format(notification.notification_type))
-
def _notify(self, notification):
# Stream to scheduler notifcations on left side
self._notification_queue.appendleft(notification)