summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2020-07-06 19:47:33 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2020-07-06 19:47:33 +0000
commit99d827faad1d38e85532e056561a967636cfc4b5 (patch)
tree55ba99140008a72e9d762b8319613bbbabe586b6
parente79f4a019d1d4c23d442f61144e6ac5177eb36b2 (diff)
parentcb2acc31743d6e208a7977288485578ca17effdd (diff)
downloadbuildstream-99d827faad1d38e85532e056561a967636cfc4b5.tar.gz
Merge branch 'bschubert/simplify-stream-interactions' into 'master'
simplify stream interactions (Remove Notifications) See merge request BuildStream/buildstream!1985
-rw-r--r--src/buildstream/_frontend/app.py4
-rw-r--r--src/buildstream/_frontend/status.py2
-rw-r--r--src/buildstream/_scheduler/__init__.py2
-rw-r--r--src/buildstream/_scheduler/jobs/job.py7
-rw-r--r--src/buildstream/_scheduler/queues/queue.py2
-rw-r--r--src/buildstream/_scheduler/scheduler.py164
-rw-r--r--src/buildstream/_state.py46
-rw-r--r--src/buildstream/_stream.py92
8 files changed, 102 insertions, 217 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index e7831dac3..3160e8b1e 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -311,7 +311,7 @@ class App:
# Print a nice summary if this is a session
if session_name:
- elapsed = self.stream.elapsed_time
+ elapsed = self._state.elapsed_time()
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
@@ -339,7 +339,7 @@ class App:
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ self._message(MessageType.SUCCESS, session_name, elapsed=self._state.elapsed_time())
if self._started:
self._print_summary()
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index a3f0d8aa7..e0505b45b 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -109,7 +109,7 @@ class Status:
if not self._term_caps:
return
- elapsed = self._stream.elapsed_time
+ elapsed = self._state.elapsed_time()
self.clear()
self._check_term_width()
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d689d6e25..d2f458fa5 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
-from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
+from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 78a375fec..7ea87dc62 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -128,6 +128,7 @@ class Job:
# Private members
#
self._scheduler = scheduler # The scheduler
+ self._messenger = self._scheduler.context.messenger
self._pipe_r = None # The read end of a pipe for message passing
self._process = None # The Process object
self._listening = False # Whether the parent is currently listening
@@ -163,7 +164,7 @@ class Job:
child_job = self.create_child_job( # pylint: disable=assignment-from-no-return
self.action_name,
- self._scheduler.context.messenger,
+ self._messenger,
self._scheduler.context.logdir,
self._logfile,
self._max_retries,
@@ -314,7 +315,7 @@ class Job:
if element_key is None:
element_key = self._message_element_key
message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
- self._scheduler.notify_messenger(message)
+ self._messenger.message(message)
# get_element()
#
@@ -470,7 +471,7 @@ class Job:
if envelope.message_type is _MessageType.LOG_MESSAGE:
# Propagate received messages from children
# back through the context.
- self._scheduler.notify_messenger(envelope.message)
+ self._messenger.message(envelope.message)
elif envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 986ac6c0a..9e444b393 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -352,7 +352,7 @@ class Queue:
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs)
- self._scheduler.notify_messenger(message)
+ self._scheduler.context.messenger.message(message)
def _element_log_path(self, element):
project = element._get_project()
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 66174ad19..3e6bf1f92 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -46,59 +46,6 @@ class SchedStatus(FastEnum):
TERMINATED = 1
-# NotificationType()
-#
-# Type of notification for inter-process communication
-# between 'front' & 'back' end when a scheduler is executing.
-# This is used as a parameter for a Notification object,
-# to be used as a conditional for control or state handling.
-#
-class NotificationType(FastEnum):
- INTERRUPT = "interrupt"
- JOB_START = "job_start"
- JOB_COMPLETE = "job_complete"
- TICK = "tick"
- TERMINATE = "terminate"
- QUIT = "quit"
- SCHED_START_TIME = "sched_start_time"
- RUNNING = "running"
- TERMINATED = "terminated"
- SUSPEND = "suspend"
- UNSUSPEND = "unsuspend"
- SUSPENDED = "suspended"
- RETRY = "retry"
- MESSAGE = "message"
-
-
-# Notification()
-#
-# An object to be passed across a bidirectional queue between
-# Stream & Scheduler. A required NotificationType() parameter
-# with accompanying information can be added as a member if
-# required. NOTE: The notification object should be lightweight
-# and all attributes must be picklable.
-#
-class Notification:
- def __init__(
- self,
- notification_type,
- *,
- full_name=None,
- job_action=None,
- job_status=None,
- time=None,
- element=None,
- message=None
- ):
- self.notification_type = notification_type
- self.full_name = full_name
- self.job_action = job_action
- self.job_status = job_status
- self.time = time
- self.element = element
- self.message = message
-
-
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -120,7 +67,7 @@ class Notification:
# ticker_callback: A callback call once per second
#
class Scheduler:
- def __init__(self, context, start_time, state, notification_queue, notifier):
+ def __init__(self, context, start_time, state, interrupt_callback, ticker_callback):
#
# Public members
@@ -138,7 +85,6 @@ class Scheduler:
# Private members
#
self._active_jobs = [] # Jobs currently being run in the scheduler
- self._starttime = start_time # Initial application start time
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
@@ -146,11 +92,11 @@ class Scheduler:
self._sched_handle = None # Whether a scheduling job is already scheduled or not
- # Bidirectional queue to send notifications back to the Scheduler's owner
- self._notification_queue = notification_queue
- self._notifier = notifier
+ self._ticker_callback = ticker_callback
+ self._interrupt_callback = interrupt_callback
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
+ self._state.register_task_retry_callback(self._failure_retry)
# run()
#
@@ -183,9 +129,6 @@ class Scheduler:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
- # Notify that the loop has been created
- self._notify(Notification(NotificationType.RUNNING))
-
# Add timeouts
self.loop.call_later(1, self._tick)
@@ -221,9 +164,6 @@ class Scheduler:
failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
- # Notify that the loop has been reset
- self._notify(Notification(NotificationType.RUNNING))
-
if failed:
status = SchedStatus.ERROR
elif self.terminated:
@@ -247,7 +187,7 @@ class Scheduler:
self.queues.clear()
- # terminate_jobs()
+ # terminate()
#
# Forcefully terminates all ongoing jobs.
#
@@ -259,7 +199,7 @@ class Scheduler:
# termination is not interrupted, and SIGINT will
# remain blocked after Scheduler.run() returns.
#
- def terminate_jobs(self):
+ def terminate(self):
# Set this right away, the frontend will check this
# attribute to decide whether or not to print status info
@@ -268,35 +208,34 @@ class Scheduler:
# Notify the frontend that we're terminated as it might be
# from an interactive prompt callback or SIGTERM
- self._notify(Notification(NotificationType.TERMINATED))
self.loop.call_soon(self._terminate_jobs_real)
# Block this until we're finished terminating jobs,
# this will remain blocked forever.
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT])
- # jobs_suspended()
+ # suspend()
#
- # Suspend jobs after being notified
+ # Suspend the scheduler
#
- def jobs_suspended(self):
+ def suspend(self):
self._disconnect_signals()
self._suspend_jobs()
- # jobs_unsuspended()
+ # resume()
#
- # Unsuspend jobs after being notified
+ # Restart the scheduler
#
- def jobs_unsuspended(self):
+ def resume(self):
self._resume_jobs()
self._connect_signals()
- # stop_queueing()
+ # stop()
#
# Stop queueing additional jobs, causes Scheduler.run()
# to return once all currently processing jobs are finished.
#
- def stop_queueing(self):
+ def stop(self):
self._queue_jobs = False
# job_completed():
@@ -312,7 +251,6 @@ class Scheduler:
# Remove from the active jobs list
self._active_jobs.remove(job)
- element_info = None
if status == JobStatus.FAIL:
# If it's an elementjob, we want to compare against the failure messages
# and send the unique_id and display key tuple of the Element. This can then
@@ -323,27 +261,11 @@ class Scheduler:
else:
element_info = None
- # Now check for more jobs
- notification = Notification(
- NotificationType.JOB_COMPLETE,
- full_name=job.name,
- job_action=job.action_name,
- job_status=status,
- element=element_info,
- )
- self._notify(notification)
- self._sched()
+ self._state.fail_task(job.action_name, job.name, element_info)
- # notify_messenger()
- #
- # Send message over notification queue to Messenger callback
- #
- # Args:
- # message (Message): A Message() to be sent to the frontend message
- # handler, as assigned by context's messenger.
- #
- def notify_messenger(self, message):
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self._state.remove_task(job.action_name, job.name)
+
+ self._sched()
#######################################################
# Local Private Methods #
@@ -362,10 +284,10 @@ class Scheduler:
#
def _abort_on_casd_failure(self, pid, returncode):
message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self.context.messenger.message(message)
self._casd_process.returncode = returncode
- self.terminate_jobs()
+ self.terminate()
# _start_job()
#
@@ -384,13 +306,7 @@ class Scheduler:
self._active_jobs.append(job)
job.start()
- notification = Notification(
- NotificationType.JOB_START,
- full_name=job.name,
- job_action=job.action_name,
- time=self._state.elapsed_time(start_time=self._starttime),
- )
- self._notify(notification)
+ self._state.add_task(job.action_name, job.name, self._state.elapsed_time())
# _sched_queue_jobs()
#
@@ -435,8 +351,8 @@ class Scheduler:
# Make sure fork is allowed before starting jobs
if not self.context.prepare_fork():
message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
- self._notify(Notification(NotificationType.MESSAGE, message=message))
- self.terminate_jobs()
+ self.context.messenger.message(message)
+ self.terminate()
return
# Start the jobs
@@ -497,7 +413,6 @@ class Scheduler:
self._suspendtime = datetime.datetime.now()
self.suspended = True
# Notify that we're suspended
- self._notify(Notification(NotificationType.SUSPENDED))
for job in self._active_jobs:
job.suspend()
@@ -511,9 +426,7 @@ class Scheduler:
job.resume()
self.suspended = False
# Notify that we're unsuspended
- self._notify(Notification(NotificationType.SUSPENDED))
- self._starttime += datetime.datetime.now() - self._suspendtime
- self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+ self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
self._suspendtime = None
# _interrupt_event():
@@ -528,15 +441,14 @@ class Scheduler:
if self.terminated:
return
- notification = Notification(NotificationType.INTERRUPT)
- self._notify(notification)
+ self._interrupt_callback()
# _terminate_event():
#
# A loop registered event callback for SIGTERM
#
def _terminate_event(self):
- self.terminate_jobs()
+ self.terminate()
# _suspend_event():
#
@@ -582,7 +494,7 @@ class Scheduler:
# Regular timeout for driving status in the UI
def _tick(self):
- self._notify(Notification(NotificationType.TICK))
+ self._ticker_callback()
self.loop.call_later(1, self._tick)
def _failure_retry(self, action_name, unique_id):
@@ -597,28 +509,6 @@ class Scheduler:
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
- def _notify(self, notification):
- # Scheduler to Stream notifcations on right side
- self._notification_queue.append(notification)
- self._notifier()
-
- def _stream_notification_handler(self):
- notification = self._notification_queue.popleft()
- if notification.notification_type == NotificationType.TERMINATE:
- self.terminate_jobs()
- elif notification.notification_type == NotificationType.QUIT:
- self.stop_queueing()
- elif notification.notification_type == NotificationType.SUSPEND:
- self.jobs_suspended()
- elif notification.notification_type == NotificationType.UNSUSPEND:
- self.jobs_unsuspended()
- elif notification.notification_type == NotificationType.RETRY:
- self._failure_retry(notification.job_action, notification.element)
- else:
- # Do not raise exception once scheduler process is separated
- # as we don't want to pickle exceptions between processes
- raise ValueError("Unrecognised notification type received")
-
def _handle_exception(self, loop, context: dict) -> None:
e = context.get("exception")
exc = bool(e)
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index ec4f895fe..6e08c004d 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -112,6 +112,7 @@ class State:
self._task_changed_cbs = []
self._task_groups_changed_cbs = []
self._task_failed_cbs = []
+ self._task_retry_cbs = []
#####################################
# Frontend-facing notification APIs #
@@ -226,6 +227,23 @@ class State:
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
+ # register_task_retry_callback()
+ #
+ # Registers a callback to be notified when a task is to be retried
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ #
+ # Callback Args:
+ # action_name (str): The name of the action, e.g. 'build'
+ # full_name (str): The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
+ # element_job (bool): (optionally) If an element job failed.
+ #
+ def register_task_retry_callback(self, callback):
+ self._task_retry_cbs.append(callback)
+
##############################################
# Core-facing APIs for driving notifications #
##############################################
@@ -336,6 +354,20 @@ class State:
for cb in self._task_failed_cbs:
cb(action_name, full_name, element)
+ # retry_task()
+ #
+ # Notify all registered callbacks that a task is to be retried.
+ #
+ # This is a core-facing API and should not be called from the frontend
+ #
+ # Args:
+ # action_name: The name of the action, e.g. 'build'
+ # unique_id: The unique id of the plugin instance to look up
+ #
+ def retry_task(self, action_name: str, unique_id: str) -> None:
+ for cb in self._task_retry_cbs:
+ cb(action_name, unique_id)
+
# elapsed_time()
#
# Fetches the current session elapsed time
@@ -354,6 +386,20 @@ class State:
start_time = self._session_start or time_now
return time_now - start_time
+ # offset_start_time()
+ #
+ # Update the 'start' time of the application by a given offset
+ #
+ # This allows modifying the time spent building when BuildStream
+ # gets paused then restarted, to give an accurate view of the real
+ # time spend building.
+ #
+ # Args:
+ # offset: the offset to add to the start time
+ #
+ def offset_start_time(self, offset: datetime.timedelta) -> None:
+ self._session_start += offset
+
# _Task
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 989a00db7..cb1e84f74 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -43,9 +43,6 @@ from ._scheduler import (
BuildQueue,
PullQueue,
ArtifactPushQueue,
- NotificationType,
- Notification,
- JobStatus,
)
from .element import Element
from ._pipeline import Pipeline
@@ -91,20 +88,14 @@ class Stream:
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
- self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(
- context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
- )
+ self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback, ticker_callback)
self._session_start_callback = session_start_callback
- self._ticker_callback = ticker_callback
- self._interrupt_callback = interrupt_callback
- self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
- self._scheduler_running = False
- self._scheduler_terminated = False
- self._scheduler_suspended = False
+ self._running = False
+ self._terminated = False
+ self._suspended = False
# init()
#
@@ -1064,7 +1055,7 @@ class Stream:
#
@property
def running(self):
- return self._scheduler_running
+ return self._running
# suspended
#
@@ -1072,7 +1063,7 @@ class Stream:
#
@property
def suspended(self):
- return self._scheduler_suspended
+ return self._suspended
# terminated
#
@@ -1080,23 +1071,15 @@ class Stream:
#
@property
def terminated(self):
- return self._scheduler_terminated
-
- # elapsed_time
- #
- # Elapsed time since the session start
- #
- @property
- def elapsed_time(self):
- return self._state.elapsed_time(start_time=self._starttime)
+ return self._terminated
# terminate()
#
# Terminate jobs
#
def terminate(self):
- notification = Notification(NotificationType.TERMINATE)
- self._notify(notification)
+ self._scheduler.terminate()
+ self._terminated = True
# quit()
#
@@ -1105,8 +1088,7 @@ class Stream:
# of ongoing jobs
#
def quit(self):
- notification = Notification(NotificationType.QUIT)
- self._notify(notification)
+ self._scheduler.stop()
# suspend()
#
@@ -1114,13 +1096,11 @@ class Stream:
#
@contextmanager
def suspend(self):
- # Send the notification to suspend jobs
- notification = Notification(NotificationType.SUSPEND)
- self._notify(notification)
+ self._scheduler.suspend()
+ self._suspended = True
yield
- # Unsuspend jobs on context exit
- notification = Notification(NotificationType.UNSUSPEND)
- self._notify(notification)
+ self._suspended = False
+ self._scheduler.resume()
#############################################################
# Private Methods #
@@ -1364,12 +1344,11 @@ class Stream:
# failed task from the tasks group.
#
# Args:
- # action_name (str): The name of the action being performed
- # unique_id (str): A unique_id to load an Element instance
+ # action_name: The name of the action being performed
+ # unique_id: A unique_id to load an Element instance
#
- def _failure_retry(self, action_name, unique_id):
- notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
- self._notify(notification)
+ def _failure_retry(self, action_name: str, unique_id: str) -> None:
+ self._state.retry_task(action_name, unique_id)
# _run()
#
@@ -1385,7 +1364,9 @@ class Stream:
if self._session_start_callback is not None:
self._session_start_callback()
+ self._running = True
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
+ self._running = False
if status == SchedStatus.ERROR:
raise StreamError()
@@ -1653,39 +1634,6 @@ class Stream:
return element_targets, artifact_refs
- def _scheduler_notification_handler(self):
- # Check the queue is there
- assert self._notification_queue
- notification = self._notification_queue.pop()
-
- if notification.notification_type == NotificationType.MESSAGE:
- self._context.messenger.message(notification.message)
- elif notification.notification_type == NotificationType.INTERRUPT:
- self._interrupt_callback()
- elif notification.notification_type == NotificationType.TICK:
- self._ticker_callback()
- elif notification.notification_type == NotificationType.JOB_START:
- self._state.add_task(notification.job_action, notification.full_name, notification.time)
- elif notification.notification_type == NotificationType.JOB_COMPLETE:
- self._state.remove_task(notification.job_action, notification.full_name)
- if notification.job_status == JobStatus.FAIL:
- self._state.fail_task(notification.job_action, notification.full_name, notification.element)
- elif notification.notification_type == NotificationType.SCHED_START_TIME:
- self._starttime = notification.time
- elif notification.notification_type == NotificationType.RUNNING:
- self._scheduler_running = not self._scheduler_running
- elif notification.notification_type == NotificationType.TERMINATED:
- self._scheduler_terminated = True
- elif notification.notification_type == NotificationType.SUSPENDED:
- self._scheduler_suspended = not self._scheduler_suspended
- else:
- raise StreamError("Unrecognised notification type received")
-
- def _notify(self, notification):
- # Stream to scheduler notifcations on left side
- self._notification_queue.appendleft(notification)
- self._notifier()
-
# _handle_compression()
#