diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 17 |
1 files changed, 15 insertions, 2 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c97cf2861..ee5a48b28 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -28,8 +28,10 @@ import shlex import shutil import tarfile import tempfile +import multiprocessing as mp from contextlib import contextmanager, suppress from fnmatch import fnmatch +import queue from ._artifact import Artifact from ._artifactelement import verify_artifact_ref, ArtifactElement @@ -80,10 +82,12 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state + self._notification_queue = mp.Queue() context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler, + self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, + self._scheduler_notification_handler, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback) self._first_non_track_queue = None @@ -1586,7 +1590,16 @@ class Stream(): return element_targets, artifact_refs - def _scheduler_notification_handler(self, notification): + def _scheduler_notification_handler(self): + # Check the queue is there and a scheduler is running + assert self._notification_queue + notification = None + #while notification is None: + #try: + notification = self._notification_queue.get_nowait() + #except queue.Empty: + # pass + if notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: |