From 9aeeafeb1a708e4ec06c7ddaf38ac4d594f83b9b Mon Sep 17 00:00:00 2001 From: Benjamin Schubert Date: Sat, 4 Jul 2020 18:20:35 +0000 Subject: scheduler.py: Remove all usage of notifications Call directly the relevant methods from the stream to the scheduler --- src/buildstream/_scheduler/__init__.py | 2 +- src/buildstream/_scheduler/scheduler.py | 89 ++++++--------------------------- src/buildstream/_stream.py | 26 ++-------- 3 files changed, 20 insertions(+), 97 deletions(-) diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d689d6e25..d2f458fa5 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus, Notification, NotificationType +from .scheduler import Scheduler, SchedStatus from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index a2f1c241f..3e6bf1f92 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -46,49 +46,6 @@ class SchedStatus(FastEnum): TERMINATED = 1 -# NotificationType() -# -# Type of notification for inter-process communication -# between 'front' & 'back' end when a scheduler is executing. -# This is used as a parameter for a Notification object, -# to be used as a conditional for control or state handling. -# -class NotificationType(FastEnum): - TERMINATE = "terminate" - QUIT = "quit" - SUSPEND = "suspend" - UNSUSPEND = "unsuspend" - - -# Notification() -# -# An object to be passed across a bidirectional queue between -# Stream & Scheduler. A required NotificationType() parameter -# with accompanying information can be added as a member if -# required. NOTE: The notification object should be lightweight -# and all attributes must be picklable. -# -class Notification: - def __init__( - self, - notification_type, - *, - full_name=None, - job_action=None, - job_status=None, - time=None, - element=None, - message=None - ): - self.notification_type = notification_type - self.full_name = full_name - self.job_action = job_action - self.job_status = job_status - self.time = time - self.element = element - self.message = message - - # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -110,7 +67,7 @@ class Notification: # ticker_callback: A callback call once per second # class Scheduler: - def __init__(self, context, start_time, state, notification_queue, interrupt_callback, ticker_callback): + def __init__(self, context, start_time, state, interrupt_callback, ticker_callback): # # Public members @@ -138,9 +95,6 @@ class Scheduler: self._ticker_callback = ticker_callback self._interrupt_callback = interrupt_callback - # Bidirectional queue to send notifications back to the Scheduler's owner - self._notification_queue = notification_queue - self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) self._state.register_task_retry_callback(self._failure_retry) @@ -233,7 +187,7 @@ class Scheduler: self.queues.clear() - # terminate_jobs() + # terminate() # # Forcefully terminates all ongoing jobs. # @@ -245,7 +199,7 @@ class Scheduler: # termination is not interrupted, and SIGINT will # remain blocked after Scheduler.run() returns. # - def terminate_jobs(self): + def terminate(self): # Set this right away, the frontend will check this # attribute to decide whether or not to print status info @@ -260,28 +214,28 @@ class Scheduler: # this will remain blocked forever. signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) - # jobs_suspended() + # suspend() # - # Suspend jobs after being notified + # Suspend the scheduler # - def jobs_suspended(self): + def suspend(self): self._disconnect_signals() self._suspend_jobs() - # jobs_unsuspended() + # resume() # - # Unsuspend jobs after being notified + # Restart the scheduler # - def jobs_unsuspended(self): + def resume(self): self._resume_jobs() self._connect_signals() - # stop_queueing() + # stop() # # Stop queueing additional jobs, causes Scheduler.run() # to return once all currently processing jobs are finished. # - def stop_queueing(self): + def stop(self): self._queue_jobs = False # job_completed(): @@ -333,7 +287,7 @@ class Scheduler: self.context.messenger.message(message) self._casd_process.returncode = returncode - self.terminate_jobs() + self.terminate() # _start_job() # @@ -398,7 +352,7 @@ class Scheduler: if not self.context.prepare_fork(): message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") self.context.messenger.message(message) - self.terminate_jobs() + self.terminate() return # Start the jobs @@ -494,7 +448,7 @@ class Scheduler: # A loop registered event callback for SIGTERM # def _terminate_event(self): - self.terminate_jobs() + self.terminate() # _suspend_event(): # @@ -555,21 +509,6 @@ class Scheduler: queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) - def _stream_notification_handler(self): - notification = self._notification_queue.popleft() - if notification.notification_type == NotificationType.TERMINATE: - self.terminate_jobs() - elif notification.notification_type == NotificationType.QUIT: - self.stop_queueing() - elif notification.notification_type == NotificationType.SUSPEND: - self.jobs_suspended() - elif notification.notification_type == NotificationType.UNSUSPEND: - self.jobs_unsuspended() - else: - # Do not raise exception once scheduler process is separated - # as we don't want to pickle exceptions between processes - raise ValueError("Unrecognised notification type received") - def _handle_exception(self, loop, context: dict) -> None: e = context.get("exception") exc = bool(e) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 0cef20e55..fb98aea20 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -43,8 +43,6 @@ from ._scheduler import ( BuildQueue, PullQueue, ArtifactPushQueue, - NotificationType, - Notification, ) from .element import Element from ._pipeline import Pipeline @@ -93,11 +91,8 @@ class Stream: context.messenger.set_state(self._state) - self._scheduler = Scheduler( - context, session_start, self._state, self._notification_queue, interrupt_callback, ticker_callback - ) + self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback, ticker_callback) self._session_start_callback = session_start_callback - self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler self._running = False self._terminated = False self._suspended = False @@ -1041,8 +1036,7 @@ class Stream: # Terminate jobs # def terminate(self): - notification = Notification(NotificationType.TERMINATE) - self._notify(notification) + self._scheduler.terminate() self._terminated = True # quit() @@ -1052,8 +1046,7 @@ class Stream: # of ongoing jobs # def quit(self): - notification = Notification(NotificationType.QUIT) - self._notify(notification) + self._scheduler.stop() # suspend() # @@ -1061,15 +1054,11 @@ class Stream: # @contextmanager def suspend(self): - # Send the notification to suspend jobs - notification = Notification(NotificationType.SUSPEND) - self._notify(notification) + self._scheduler.suspend() self._suspended = True yield self._suspended = False - # Unsuspend jobs on context exit - notification = Notification(NotificationType.UNSUSPEND) - self._notify(notification) + self._scheduler.resume() ############################################################# # Private Methods # @@ -1603,11 +1592,6 @@ class Stream: return element_targets, artifact_refs - def _notify(self, notification): - # Stream to scheduler notifcations on left side - self._notification_queue.appendleft(notification) - self._notifier() - # _handle_compression() # -- cgit v1.2.1