summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py17
1 files changed, 15 insertions, 2 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c97cf2861..ee5a48b28 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,8 +28,10 @@ import shlex
import shutil
import tarfile
import tempfile
+import multiprocessing as mp
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -80,10 +82,12 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = mp.Queue()
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
@@ -1586,7 +1590,16 @@ class Stream():
return element_targets, artifact_refs
- def _scheduler_notification_handler(self, notification):
+ def _scheduler_notification_handler(self):
+ # Check the queue is there and a scheduler is running
+ assert self._notification_queue
+ notification = None
+ #while notification is None:
+ #try:
+ notification = self._notification_queue.get_nowait()
+ #except queue.Empty:
+ # pass
+
if notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK: