summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-12 15:04:57 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-08-15 12:53:32 +0100
commit0106e1bfe9d11df7690078bf454519e73407a3e2 (patch)
tree4f50d243fd17e479173a6e298785a7e1c35fed88
parent998fd07484c609e325ea9d533c7a2358cbe8d41a (diff)
downloadbuildstream-tpollard/notificationhandlertmp.tar.gz
-rw-r--r--src/buildstream/_scheduler/scheduler.py26
-rw-r--r--src/buildstream/_stream.py17
2 files changed, 34 insertions, 9 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e2a180c16..77eca2520 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -26,6 +26,7 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
+import time
# Local imports
from .resources import Resources, ResourceType
@@ -95,7 +96,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, notification_handler,
+ start_time, state, notification_queue, notifier,
interrupt_callback=None,
ticker_callback=None):
@@ -126,8 +127,9 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send notifications to report back to the Scheduler's owner
- self.notify = notification_handler
+ # Message to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
+ self._notifier = notifier
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -294,7 +296,7 @@ class Scheduler():
job_action=job.action_name,
job_status=status,
element=element)
- self.notify(notification)
+ self._notify(notification)
self._sched()
# check_cache_size():
@@ -355,7 +357,7 @@ class Scheduler():
full_name=job.name,
job_action=job.action_name,
elapsed_time=self.elapsed_time())
- self.notify(notification)
+ self._notify(notification)
job.start()
# Callback for the cache size job
@@ -575,7 +577,7 @@ class Scheduler():
return
notification = Notification(NotificationType.INTERRUPT)
- self.notify(notification)
+ self._notify(notification)
# _terminate_event():
#
@@ -635,9 +637,19 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
notification = Notification(NotificationType.TICK)
- self.notify(notification)
+ self._notify(notification)
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ self._notification_queue.put(notification)
+ x = 0
+ while self._notification_queue.empty():
+ time.sleep(0.1)
+ x = x +1
+ if x == 10:
+ raise ValueError("queue still empty")
+ self._notifier()
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c97cf2861..ee5a48b28 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,8 +28,10 @@ import shlex
import shutil
import tarfile
import tempfile
+import multiprocessing as mp
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+import queue
from ._artifact import Artifact
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -80,10 +82,12 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = mp.Queue()
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback)
self._first_non_track_queue = None
@@ -1586,7 +1590,16 @@ class Stream():
return element_targets, artifact_refs
- def _scheduler_notification_handler(self, notification):
+ def _scheduler_notification_handler(self):
+ # Check the queue is there and a scheduler is running
+ assert self._notification_queue
+ notification = None
+ #while notification is None:
+ #try:
+ notification = self._notification_queue.get_nowait()
+ #except queue.Empty:
+ # pass
+
if notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
elif notification.notification_type == NotificationType.TICK: