summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-09-10 11:17:39 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 11:17:39 +0000
commit826e12a2804dfa49d55eb59f07dd2d116ada10c2 (patch)
treeb6c38c31da12c85d486bf4333f9559ea86f85f7a
parenta200338e3b1947c0d8e7a89b1977f753988dc76e (diff)
parenta97af8edd7e1e9385c45fca2761f7327f9180059 (diff)
downloadbuildstream-826e12a2804dfa49d55eb59f07dd2d116ada10c2.tar.gz
Merge branch 'tpollard/notificationhandler' into 'master'
Stream - Scheduler notification handler See merge request BuildStream/buildstream!1550
-rw-r--r--src/buildstream/_frontend/app.py6
-rw-r--r--src/buildstream/_scheduler/__init__.py2
-rw-r--r--src/buildstream/_scheduler/jobs/job.py6
-rw-r--r--src/buildstream/_scheduler/queues/queue.py3
-rw-r--r--src/buildstream/_scheduler/scheduler.py182
-rw-r--r--src/buildstream/_state.py24
-rw-r--r--src/buildstream/_stream.py90
7 files changed, 234 insertions, 79 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index f9729a7ce..45160afbc 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -663,11 +663,7 @@ class App():
elif choice == 'retry':
click.echo("\nRetrying failed job\n", err=True)
unique_id = element[0]
- try:
- self.stream._failure_retry(action_name, unique_id)
- except StreamError:
- click.echo("Job action {} does not have a corresponding queue".format(action_name), err=True)
- self.stream.terminate()
+ self.stream._failure_retry(action_name, unique_id)
#
# Print the session heading if we've loaded a pipeline and there
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458fa5..d689d6e25 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 9af08df92..913e27ea2 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -393,8 +393,8 @@ class Job():
element_name = self._message_element_name
if element_key is None:
element_key = self._message_element_key
- self._scheduler.context.messenger.message(
- Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs))
+ message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
+ self._scheduler.notify_messenger(message)
# get_element()
#
@@ -536,7 +536,7 @@ class Job():
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.context.messenger.message(envelope.message)
+ self._scheduler.notify_messenger(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 745b59417..6c6dfdc4f 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -345,9 +345,8 @@ class Queue():
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
- context = element._get_context()
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- context.messenger.message(message)
+ self._scheduler.notify_messenger(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 37295b285..d0a189545 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,21 +24,74 @@ import asyncio
from itertools import chain
import signal
import datetime
-from contextlib import contextmanager
# Local imports
from .resources import Resources
from .jobs import JobStatus
+from ..types import FastEnum
from .._profile import Topics, PROFILER
+from ..plugin import Plugin
# A decent return code for Scheduler.run()
-class SchedStatus():
+class SchedStatus(FastEnum):
SUCCESS = 0
ERROR = -1
TERMINATED = 1
+# NotificationType()
+#
+# Type of notification for inter-process communication
+# between 'front' & 'back' end when a scheduler is executing.
+# This is used as a parameter for a Notification object,
+# to be used as a conditional for control or state handling.
+#
+class NotificationType(FastEnum):
+ INTERRUPT = "interrupt"
+ JOB_START = "job_start"
+ JOB_COMPLETE = "job_complete"
+ TICK = "tick"
+ TERMINATE = "terminate"
+ QUIT = "quit"
+ SCHED_START_TIME = "sched_start_time"
+ RUNNING = "running"
+ TERMINATED = "terminated"
+ SUSPEND = "suspend"
+ UNSUSPEND = "unsuspend"
+ SUSPENDED = "suspended"
+ RETRY = "retry"
+ MESSAGE = "message"
+
+
+# Notification()
+#
+# An object to be passed across a bidirectional queue between
+# Stream & Scheduler. A required NotificationType() parameter
+# with accompanying information can be added as a member if
+# required. NOTE: The notification object should be lightweight
+# and all attributes must be picklable.
+#
+class Notification():
+
+ def __init__(self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ time=None,
+ element=None,
+ message=None):
+ self.notification_type = notification_type
+ self.full_name = full_name
+ self.job_action = job_action
+ self.job_status = job_status
+ self.time = time
+ self.element = element
+ self.message = message
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -62,9 +115,7 @@ class SchedStatus():
class Scheduler():
def __init__(self, context,
- start_time, state,
- interrupt_callback=None,
- ticker_callback=None):
+ start_time, state, notification_queue, notifier):
#
# Public members
@@ -87,9 +138,9 @@ class Scheduler():
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- # Callbacks to report back to the Scheduler owner
- self._interrupt_callback = interrupt_callback
- self._ticker_callback = ticker_callback
+ # Bidirectional queue to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
+ self._notifier = notifier
self.resources = Resources(context.sched_builders,
context.sched_fetchers,
@@ -120,9 +171,11 @@ class Scheduler():
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
+ # Notify that the loop has been created
+ self._notify(Notification(NotificationType.RUNNING))
+
# Add timeouts
- if self._ticker_callback:
- self.loop.call_later(1, self._tick)
+ self.loop.call_later(1, self._tick)
# Handle unix signals while running
self._connect_signals()
@@ -140,6 +193,9 @@ class Scheduler():
failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
+ # Notify that the loop has been reset
+ self._notify(Notification(NotificationType.RUNNING))
+
if failed:
status = SchedStatus.ERROR
elif self.terminated:
@@ -181,6 +237,10 @@ class Scheduler():
# attribute to decide whether or not to print status info
# etc and the following code block will trigger some callbacks.
self.terminated = True
+
+ # Notify the frontend that we're terminated as it might be
+ # from an interactive prompt callback or SIGTERM
+ self._notify(Notification(NotificationType.TERMINATED))
self.loop.call_soon(self._terminate_jobs_real)
# Block this until we're finished terminating jobs,
@@ -189,15 +249,17 @@ class Scheduler():
# jobs_suspended()
#
- # A context manager for running with jobs suspended
+ # Suspend jobs after being notified
#
- @contextmanager
def jobs_suspended(self):
self._disconnect_signals()
self._suspend_jobs()
- yield
-
+ # jobs_unsuspended()
+ #
+ # Unsuspend jobs after being notified
+ #
+ def jobs_unsuspended(self):
self._resume_jobs()
self._connect_signals()
@@ -209,21 +271,6 @@ class Scheduler():
def stop_queueing(self):
self._queue_jobs = False
- # elapsed_time()
- #
- # Fetches the current session elapsed time
- #
- # Returns:
- # (timedelta): The amount of time since the start of the session,
- # discounting any time spent while jobs were suspended.
- #
- def elapsed_time(self):
- timenow = datetime.datetime.now()
- starttime = self._starttime
- if not starttime:
- starttime = timenow
- return timenow - starttime
-
# job_completed():
#
# Called when a Job completes
@@ -234,11 +281,10 @@ class Scheduler():
# status (JobStatus): The status of the completed job
#
def job_completed(self, job, status):
-
# Remove from the active jobs list
self._active_jobs.remove(job)
- self._state.remove_task(job.action_name, job.name)
+ element_info = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the unique_id and display key tuple of the Element. This can then
@@ -248,11 +294,27 @@ class Scheduler():
element_info = element._unique_id, element._get_display_key()
else:
element_info = None
- self._state.fail_task(job.action_name, job.name, element=element_info)
# Now check for more jobs
+ notification = Notification(NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element_info)
+ self._notify(notification)
self._sched()
+ # notify_messenger()
+ #
+ # Send message over notification queue to Messenger callback
+ #
+ # Args:
+ # message (Message): A Message() to be sent to the frontend message
+ # handler, as assigned by context's messenger.
+ #
+ def notify_messenger(self, message):
+ self._notify(Notification(NotificationType.MESSAGE, message=message))
+
#######################################################
# Local Private Methods #
#######################################################
@@ -266,7 +328,11 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- self._state.add_task(job.action_name, job.name, self.elapsed_time())
+ notification = Notification(NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ time=self._state.elapsed_time(start_time=self._starttime))
+ self._notify(notification)
job.start()
# _sched_queue_jobs()
@@ -350,6 +416,8 @@ class Scheduler():
if not self.suspended:
self._suspendtime = datetime.datetime.now()
self.suspended = True
+ # Notify that we're suspended
+ self._notify(Notification(NotificationType.SUSPENDED))
for job in self._active_jobs:
job.suspend()
@@ -362,7 +430,10 @@ class Scheduler():
for job in self._active_jobs:
job.resume()
self.suspended = False
+ # Notify that we're unsuspended
+ self._notify(Notification(NotificationType.SUSPENDED))
self._starttime += (datetime.datetime.now() - self._suspendtime)
+ self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
self._suspendtime = None
# _interrupt_event():
@@ -379,13 +450,8 @@ class Scheduler():
if self.terminated:
return
- # Leave this to the frontend to decide, if no
- # interrrupt callback was specified, then just terminate.
- if self._interrupt_callback:
- self._interrupt_callback()
- else:
- # Default without a frontend is just terminate
- self.terminate_jobs()
+ notification = Notification(NotificationType.INTERRUPT)
+ self._notify(notification)
# _terminate_event():
#
@@ -444,9 +510,43 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
- self._ticker_callback()
+ self._notify(Notification(NotificationType.TICK))
self.loop.call_later(1, self._tick)
+ def _failure_retry(self, action_name, unique_id):
+ queue = None
+ for q in self.queues:
+ if q.action_name == action_name:
+ queue = q
+ break
+ # Assert queue found, we should only be retrying a queued job
+ assert queue
+ element = Plugin._lookup(unique_id)
+ queue._task_group.failed_tasks.remove(element._get_full_name())
+ queue.enqueue([element])
+
+ def _notify(self, notification):
+ # Scheduler to Stream notifcations on right side
+ self._notification_queue.append(notification)
+ self._notifier()
+
+ def _stream_notification_handler(self):
+ notification = self._notification_queue.popleft()
+ if notification.notification_type == NotificationType.TERMINATE:
+ self.terminate_jobs()
+ elif notification.notification_type == NotificationType.QUIT:
+ self.stop_queueing()
+ elif notification.notification_type == NotificationType.SUSPEND:
+ self.jobs_suspended()
+ elif notification.notification_type == NotificationType.UNSUSPEND:
+ self.jobs_unsuspended()
+ elif notification.notification_type == NotificationType.RETRY:
+ self._failure_retry(notification.job_action, notification.element)
+ else:
+ # Do not raise exception once scheduler process is separated
+ # as we don't want to pickle exceptions between processes
+ raise ValueError("Unrecognised notification type received")
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index df3bceff2..c99434018 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -272,7 +272,9 @@ class State():
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
- # to buildstream's start time.
+ # to buildstream's start time. Note scheduler tasks
+ # use this as they don't report relative to wallclock time
+ # if the Scheduler has been suspended.
#
def add_task(self, action_name, full_name, elapsed_offset=None):
task_key = (action_name, full_name)
@@ -280,7 +282,7 @@ class State():
"Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks)
if not elapsed_offset:
- elapsed_offset = datetime.datetime.now() - self._session_start
+ elapsed_offset = self.elapsed_time()
task = _Task(self, action_name, full_name, elapsed_offset)
self.tasks[task_key] = task
@@ -330,6 +332,24 @@ class State():
for cb in self._task_failed_cbs:
cb(action_name, full_name, element)
+ # elapsed_time()
+ #
+ # Fetches the current session elapsed time
+ #
+ # Args:
+ # start_time(time): Optional explicit start time, relative to caller.
+ #
+ # Returns:
+ # (timedelta): The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended if
+ # start_time given relative to the Scheduler
+ #
+ def elapsed_time(self, start_time=None):
+ time_now = datetime.datetime.now()
+ if start_time is None:
+ start_time = self._session_start or time_now
+ return time_now - start_time
+
# _Task
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2e43bb1a2..293ba051d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -28,12 +28,13 @@ import tarfile
import tempfile
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
+from collections import deque
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
- SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
from ._state import State
@@ -78,14 +79,21 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
+ self._notification_queue = deque()
+ self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state,
- interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback)
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
+ self._scheduler_notification_handler)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
+ self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
+ self._scheduler_running = False
+ self._scheduler_terminated = False
+ self._scheduler_suspended = False
# init()
#
@@ -1072,7 +1080,7 @@ class Stream():
#
@property
def running(self):
- return self._scheduler.loop is not None
+ return self._scheduler_running
# suspended
#
@@ -1080,7 +1088,7 @@ class Stream():
#
@property
def suspended(self):
- return self._scheduler.suspended
+ return self._scheduler_suspended
# terminated
#
@@ -1088,7 +1096,7 @@ class Stream():
#
@property
def terminated(self):
- return self._scheduler.terminated
+ return self._scheduler_terminated
# elapsed_time
#
@@ -1096,14 +1104,15 @@ class Stream():
#
@property
def elapsed_time(self):
- return self._scheduler.elapsed_time()
+ return self._state.elapsed_time(start_time=self._starttime)
# terminate()
#
# Terminate jobs
#
def terminate(self):
- self._scheduler.terminate_jobs()
+ notification = Notification(NotificationType.TERMINATE)
+ self._notify(notification)
# quit()
#
@@ -1112,7 +1121,8 @@ class Stream():
# of ongoing jobs
#
def quit(self):
- self._scheduler.stop_queueing()
+ notification = Notification(NotificationType.QUIT)
+ self._notify(notification)
# suspend()
#
@@ -1120,8 +1130,13 @@ class Stream():
#
@contextmanager
def suspend(self):
- with self._scheduler.jobs_suspended():
- yield
+ # Send the notification to suspend jobs
+ notification = Notification(NotificationType.SUSPEND)
+ self._notify(notification)
+ yield
+ # Unsuspend jobs on context exit
+ notification = Notification(NotificationType.UNSUSPEND)
+ self._notify(notification)
#############################################################
# Private Methods #
@@ -1323,20 +1338,11 @@ class Stream():
# action_name (str): The name of the action being performed
# unique_id (str): A unique_id to load an Element instance
#
- # Raises:
- # (StreamError): If the related queue cannot be found
- #
def _failure_retry(self, action_name, unique_id):
- queue = None
- # Attempt to resolve the required queue
- for queue in self.queues:
- if queue.action_name == action_name:
- queue = queue
- if not queue:
- raise StreamError()
- element = Plugin._lookup(unique_id)
- queue._task_group.failed_tasks.remove(element._get_full_name())
- queue.enqueue([element])
+ notification = Notification(NotificationType.RETRY,
+ job_action=action_name,
+ element=unique_id)
+ self._notify(notification)
# _run()
#
@@ -1642,6 +1648,40 @@ class Stream():
return element_targets, artifact_refs
+ def _scheduler_notification_handler(self):
+ # Check the queue is there
+ assert self._notification_queue
+ notification = self._notification_queue.pop()
+
+ if notification.notification_type == NotificationType.MESSAGE:
+ self._context.messenger.message(notification.message)
+ elif notification.notification_type == NotificationType.INTERRUPT:
+ self._interrupt_callback()
+ elif notification.notification_type == NotificationType.TICK:
+ self._ticker_callback()
+ elif notification.notification_type == NotificationType.JOB_START:
+ self._state.add_task(notification.job_action, notification.full_name, notification.time)
+ elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ self._state.remove_task(notification.job_action, notification.full_name)
+ if notification.job_status == JobStatus.FAIL:
+ self._state.fail_task(notification.job_action, notification.full_name,
+ notification.element)
+ elif notification.notification_type == NotificationType.SCHED_START_TIME:
+ self._starttime = notification.time
+ elif notification.notification_type == NotificationType.RUNNING:
+ self._scheduler_running = not self._scheduler_running
+ elif notification.notification_type == NotificationType.TERMINATED:
+ self._scheduler_terminated = True
+ elif notification.notification_type == NotificationType.SUSPENDED:
+ self._scheduler_suspended = not self._scheduler_suspended
+ else:
+ raise StreamError("Unrecognised notification type received")
+
+ def _notify(self, notification):
+ # Stream to scheduler notifcations on left side
+ self._notification_queue.appendleft(notification)
+ self._notifier()
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and