summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-04 18:20:35 +0000
committerBenjamin Schubert <contact@benschubert.me>2020-07-05 15:09:17 +0000
commit9aeeafeb1a708e4ec06c7ddaf38ac4d594f83b9b (patch)
treeb5359d5427e4b5016edad7083f8806f96e8853b5
parent0ac01c40872881593b91bb723ade11f133601cda (diff)
downloadbuildstream-bschubert/simplify-stream-interactions.tar.gz
scheduler.py: Remove all usage of notificationsbschubert/simplify-stream-interactions
Call directly the relevant methods from the stream to the scheduler
-rw-r--r--src/buildstream/_scheduler/__init__.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py89
-rw-r--r--src/buildstream/_stream.py26
3 files changed, 20 insertions, 97 deletions
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d689d6e25..d2f458fa5 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
+from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a2f1c241f..3e6bf1f92 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -46,49 +46,6 @@ class SchedStatus(FastEnum):
TERMINATED = 1
-# NotificationType()
-#
-# Type of notification for inter-process communication
-# between 'front' & 'back' end when a scheduler is executing.
-# This is used as a parameter for a Notification object,
-# to be used as a conditional for control or state handling.
-#
-class NotificationType(FastEnum):
- TERMINATE = "terminate"
- QUIT = "quit"
- SUSPEND = "suspend"
- UNSUSPEND = "unsuspend"
-
-
-# Notification()
-#
-# An object to be passed across a bidirectional queue between
-# Stream & Scheduler. A required NotificationType() parameter
-# with accompanying information can be added as a member if
-# required. NOTE: The notification object should be lightweight
-# and all attributes must be picklable.
-#
-class Notification:
- def __init__(
- self,
- notification_type,
- *,
- full_name=None,
- job_action=None,
- job_status=None,
- time=None,
- element=None,
- message=None
- ):
- self.notification_type = notification_type
- self.full_name = full_name
- self.job_action = job_action
- self.job_status = job_status
- self.time = time
- self.element = element
- self.message = message
-
-
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -110,7 +67,7 @@ class Notification:
# ticker_callback: A callback call once per second
#
class Scheduler:
- def __init__(self, context, start_time, state, notification_queue, interrupt_callback, ticker_callback):
+ def __init__(self, context, start_time, state, interrupt_callback, ticker_callback):
#
# Public members
@@ -138,9 +95,6 @@ class Scheduler:
self._ticker_callback = ticker_callback
self._interrupt_callback = interrupt_callback
- # Bidirectional queue to send notifications back to the Scheduler's owner
- self._notification_queue = notification_queue
-
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
self._state.register_task_retry_callback(self._failure_retry)
@@ -233,7 +187,7 @@ class Scheduler:
self.queues.clear()
- # terminate_jobs()
+ # terminate()
#
# Forcefully terminates all ongoing jobs.
#
@@ -245,7 +199,7 @@ class Scheduler:
# termination is not interrupted, and SIGINT will
# remain blocked after Scheduler.run() returns.
#
- def terminate_jobs(self):
+ def terminate(self):
# Set this right away, the frontend will check this
# attribute to decide whether or not to print status info
@@ -260,28 +214,28 @@ class Scheduler:
# this will remain blocked forever.
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
- # jobs_suspended()
+ # suspend()
#
- # Suspend jobs after being notified
+ # Suspend the scheduler
#
- def jobs_suspended(self):
+ def suspend(self):
self._disconnect_signals()
self._suspend_jobs()
- # jobs_unsuspended()
+ # resume()
#
- # Unsuspend jobs after being notified
+ # Restart the scheduler
#
- def jobs_unsuspended(self):
+ def resume(self):
self._resume_jobs()
self._connect_signals()
- # stop_queueing()
+ # stop()
#
# Stop queueing additional jobs, causes Scheduler.run()
# to return once all currently processing jobs are finished.
#
- def stop_queueing(self):
+ def stop(self):
self._queue_jobs = False
# job_completed():
@@ -333,7 +287,7 @@ class Scheduler:
self.context.messenger.message(message)
self._casd_process.returncode = returncode
- self.terminate_jobs()
+ self.terminate()
# _start_job()
#
@@ -398,7 +352,7 @@ class Scheduler:
if not self.context.prepare_fork():
message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
self.context.messenger.message(message)
- self.terminate_jobs()
+ self.terminate()
return
# Start the jobs
@@ -494,7 +448,7 @@ class Scheduler:
# A loop registered event callback for SIGTERM
#
def _terminate_event(self):
- self.terminate_jobs()
+ self.terminate()
# _suspend_event():
#
@@ -555,21 +509,6 @@ class Scheduler:
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
- def _stream_notification_handler(self):
- notification = self._notification_queue.popleft()
- if notification.notification_type == NotificationType.TERMINATE:
- self.terminate_jobs()
- elif notification.notification_type == NotificationType.QUIT:
- self.stop_queueing()
- elif notification.notification_type == NotificationType.SUSPEND:
- self.jobs_suspended()
- elif notification.notification_type == NotificationType.UNSUSPEND:
- self.jobs_unsuspended()
- else:
- # Do not raise exception once scheduler process is separated
- # as we don't want to pickle exceptions between processes
- raise ValueError("Unrecognised notification type received")
-
def _handle_exception(self, loop, context: dict) -> None:
e = context.get("exception")
exc = bool(e)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0cef20e55..fb98aea20 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -43,8 +43,6 @@ from ._scheduler import (
BuildQueue,
PullQueue,
ArtifactPushQueue,
- NotificationType,
- Notification,
)
from .element import Element
from ._pipeline import Pipeline
@@ -93,11 +91,8 @@ class Stream:
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(
- context, session_start, self._state, self._notification_queue, interrupt_callback, ticker_callback
- )
+ self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback, ticker_callback)
self._session_start_callback = session_start_callback
- self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
self._running = False
self._terminated = False
self._suspended = False
@@ -1041,8 +1036,7 @@ class Stream:
# Terminate jobs
#
def terminate(self):
- notification = Notification(NotificationType.TERMINATE)
- self._notify(notification)
+ self._scheduler.terminate()
self._terminated = True
# quit()
@@ -1052,8 +1046,7 @@ class Stream:
# of ongoing jobs
#
def quit(self):
- notification = Notification(NotificationType.QUIT)
- self._notify(notification)
+ self._scheduler.stop()
# suspend()
#
@@ -1061,15 +1054,11 @@ class Stream:
#
@contextmanager
def suspend(self):
- # Send the notification to suspend jobs
- notification = Notification(NotificationType.SUSPEND)
- self._notify(notification)
+ self._scheduler.suspend()
self._suspended = True
yield
self._suspended = False
- # Unsuspend jobs on context exit
- notification = Notification(NotificationType.UNSUSPEND)
- self._notify(notification)
+ self._scheduler.resume()
#############################################################
# Private Methods #
@@ -1603,11 +1592,6 @@ class Stream:
return element_targets, artifact_refs
- def _notify(self, notification):
- # Stream to scheduler notifcations on left side
- self._notification_queue.appendleft(notification)
- self._notifier()
-
# _handle_compression()
#