diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-12 15:04:57 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-15 12:53:32 +0100 |
commit | 0106e1bfe9d11df7690078bf454519e73407a3e2 (patch) | |
tree | 4f50d243fd17e479173a6e298785a7e1c35fed88 | |
parent | 998fd07484c609e325ea9d533c7a2358cbe8d41a (diff) | |
download | buildstream-tpollard/notificationhandlertmp.tar.gz |
switching to queuetpollard/notificationhandlertmp
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 26 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 17 |
2 files changed, 34 insertions, 9 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index e2a180c16..77eca2520 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -26,6 +26,7 @@ from itertools import chain import signal import datetime from contextlib import contextmanager +import time # Local imports from .resources import Resources, ResourceType @@ -95,7 +96,7 @@ class Notification: class Scheduler(): def __init__(self, context, - start_time, state, notification_handler, + start_time, state, notification_queue, notifier, interrupt_callback=None, ticker_callback=None): @@ -126,8 +127,9 @@ class Scheduler(): self._cleanup_scheduled = False # Whether we have a cleanup job scheduled self._cleanup_running = None # A running CleanupJob, or None - # Callback to send notifications to report back to the Scheduler's owner - self.notify = notification_handler + # Message to send notifications back to the Scheduler's owner + self._notification_queue = notification_queue + self._notifier = notifier # Whether our exclusive jobs, like 'cleanup' are currently already # waiting or active. @@ -294,7 +296,7 @@ class Scheduler(): job_action=job.action_name, job_status=status, element=element) - self.notify(notification) + self._notify(notification) self._sched() # check_cache_size(): @@ -355,7 +357,7 @@ class Scheduler(): full_name=job.name, job_action=job.action_name, elapsed_time=self.elapsed_time()) - self.notify(notification) + self._notify(notification) job.start() # Callback for the cache size job @@ -575,7 +577,7 @@ class Scheduler(): return notification = Notification(NotificationType.INTERRUPT) - self.notify(notification) + self._notify(notification) # _terminate_event(): # @@ -635,9 +637,19 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): notification = Notification(NotificationType.TICK) - self.notify(notification) + self._notify(notification) self.loop.call_later(1, self._tick) + def _notify(self, notification): + self._notification_queue.put(notification) + x = 0 + while self._notification_queue.empty(): + time.sleep(0.1) + x = x +1 + if x == 10: + raise ValueError("queue still empty") + self._notifier() + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c97cf2861..ee5a48b28 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -28,8 +28,10 @@ import shlex import shutil import tarfile import tempfile +import multiprocessing as mp from contextlib import contextmanager, suppress from fnmatch import fnmatch +import queue from ._artifact import Artifact from ._artifactelement import verify_artifact_ref, ArtifactElement @@ -80,10 +82,12 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state + self._notification_queue = mp.Queue() context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler, + self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, + self._scheduler_notification_handler, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback) self._first_non_track_queue = None @@ -1586,7 +1590,16 @@ class Stream(): return element_targets, artifact_refs - def _scheduler_notification_handler(self, notification): + def _scheduler_notification_handler(self): + # Check the queue is there and a scheduler is running + assert self._notification_queue + notification = None + #while notification is None: + #try: + notification = self._notification_queue.get_nowait() + #except queue.Empty: + # pass + if notification.notification_type == NotificationType.INTERRUPT: self._interrupt_callback() elif notification.notification_type == NotificationType.TICK: |