summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-10-11 10:45:58 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-10-11 10:45:58 +0100
commitd7d056bff1e90dae359d8feb0f287ce0a5f098a8 (patch)
treed4030690f62acbac660876ab9d43290248d3f5b5
parentbeb8dc7397378bab726e405531e816c99c500996 (diff)
downloadbuildstream-tpollard/streamasync.tar.gz
basic async in streamtpollard/streamasync
-rw-r--r--src/buildstream/_scheduler/scheduler.py1
-rw-r--r--src/buildstream/_stream.py77
2 files changed, 57 insertions, 21 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 122ba3716..0d06500c0 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@ class NotificationType(FastEnum):
START = "start"
TASK_GROUPS = "task_groups"
ELEMENT_TOTALS = "element_totals"
+ FINISH = "finish"
# Notification()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 74f7755e0..f0f61383e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -74,6 +74,7 @@ class Stream():
self.queues = [] # Queue objects
self.len_session_elements = None
self.len_total_elements = None
+ self.loop = None
#
# Private members
@@ -141,26 +142,37 @@ class Stream():
self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
kwargs=kwargs, name=process_name)
+
self._subprocess.start()
+ # We can now launch another async
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+ self._start_listening()
+ #raise ValueError("started listening")
+ self.loop.run_forever()
+
+ # Run forever needs to be forcefully stopped, else we never exit the statement
+
+ #raise ValueError("run_forever")
# TODO connect signal handlers with asyncio
- while self._subprocess.exitcode is None:
+ #while self._subprocess.exitcode is None:
# check every given time interval on subprocess state
- self._subprocess.join(0.01)
- # if no exit code, go back to checking the message queue
- self._loop()
-
+ #self._subprocess.join(0.01)
+ # Scheduler has stopped running, so safe to still have async here
+ self._stop_listening()
+ #print("closing the loop")
+ #raise ValueError("closing loop")
+ #self.loop.stop()
+ self.loop.close()
+ self.loop = None
# Set main process back
utils._reset_main_pid()
# Ensure no more notifcations to process
- try:
- while True:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- pass
-
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
# cleanup()
#
@@ -1456,6 +1468,9 @@ class Stream():
status = self._scheduler.run(self.queues)
+ # Scheduler has finished running, send frontend notification
+ self._notify_front(Notification(NotificationType.FINISH))
+
if status == SchedStatus.ERROR:
raise StreamError()
if status == SchedStatus.TERMINATED:
@@ -1774,12 +1789,18 @@ class Stream():
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(*notification.task_error)
elif notification.notification_type == NotificationType.EXCEPTION:
+ # If we're looping, stop
+ if self.loop:
+ self.loop.stop()
# Regenerate the exception here, so we don't have to pickle it
raise SubprocessException(**notification.exception)
elif notification.notification_type == NotificationType.START:
self._session_start_callback()
elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
self.len_session_elements, self.len_total_elements = notification.element_totals
+ elif notification.notification_type == NotificationType.FINISH:
+ if self.loop:
+ self.loop.stop()
else:
raise StreamError("Unrecognised notification type received")
@@ -1797,16 +1818,30 @@ class Stream():
# The code to be run by the Stream's event loop while delegating
# work to a subprocess with the @subprocessed decorator
- def _loop(self):
- assert self._notify_front_queue
+ #def _loop(self):
+ #assert self._notify_front_queue
# Check for and process new messages
- while True:
- try:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- notification = None
- break
+ #while True:
+ #try:
+ #notification = self._notify_front_queue.get_nowait()
+ #self._notification_handler(notification)
+ #except queue.Empty:
+ #notification = None
+ #break
+
+ def _loop(self):
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
+
+ def _start_listening(self):
+ if self._notify_front_queue:
+ self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+ def _stop_listening(self):
+ if self._notify_front_queue:
+ self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing