diff options
author | bst-marge-bot <marge-bot@buildstream.build> | 2020-07-06 19:47:33 +0000 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2020-07-06 19:47:33 +0000 |
commit | 99d827faad1d38e85532e056561a967636cfc4b5 (patch) | |
tree | 55ba99140008a72e9d762b8319613bbbabe586b6 | |
parent | e79f4a019d1d4c23d442f61144e6ac5177eb36b2 (diff) | |
parent | cb2acc31743d6e208a7977288485578ca17effdd (diff) | |
download | buildstream-99d827faad1d38e85532e056561a967636cfc4b5.tar.gz |
Merge branch 'bschubert/simplify-stream-interactions' into 'master'
simplify stream interactions (Remove Notifications)
See merge request BuildStream/buildstream!1985
-rw-r--r-- | src/buildstream/_frontend/app.py | 4 | ||||
-rw-r--r-- | src/buildstream/_frontend/status.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 164 | ||||
-rw-r--r-- | src/buildstream/_state.py | 46 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 92 |
8 files changed, 102 insertions, 217 deletions
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py index e7831dac3..3160e8b1e 100644 --- a/src/buildstream/_frontend/app.py +++ b/src/buildstream/_frontend/app.py @@ -311,7 +311,7 @@ class App: # Print a nice summary if this is a session if session_name: - elapsed = self.stream.elapsed_time + elapsed = self._state.elapsed_time() if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed) @@ -339,7 +339,7 @@ class App: else: # No exceptions occurred, print session time and summary if session_name: - self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time) + self._message(MessageType.SUCCESS, session_name, elapsed=self._state.elapsed_time()) if self._started: self._print_summary() diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index a3f0d8aa7..e0505b45b 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -109,7 +109,7 @@ class Status: if not self._term_caps: return - elapsed = self._stream.elapsed_time + elapsed = self._state.elapsed_time() self.clear() self._check_term_width() diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py index d689d6e25..d2f458fa5 100644 --- a/src/buildstream/_scheduler/__init__.py +++ b/src/buildstream/_scheduler/__init__.py @@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue from .queues.artifactpushqueue import ArtifactPushQueue from .queues.pullqueue import PullQueue -from .scheduler import Scheduler, SchedStatus, Notification, NotificationType +from .scheduler import Scheduler, SchedStatus from .jobs import ElementJob, JobStatus diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 78a375fec..7ea87dc62 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -128,6 +128,7 @@ class Job: # Private members # self._scheduler = scheduler # The scheduler + self._messenger = self._scheduler.context.messenger self._pipe_r = None # The read end of a pipe for message passing self._process = None # The Process object self._listening = False # Whether the parent is currently listening @@ -163,7 +164,7 @@ class Job: child_job = self.create_child_job( # pylint: disable=assignment-from-no-return self.action_name, - self._scheduler.context.messenger, + self._messenger, self._scheduler.context.logdir, self._logfile, self._max_retries, @@ -314,7 +315,7 @@ class Job: if element_key is None: element_key = self._message_element_key message = Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs) - self._scheduler.notify_messenger(message) + self._messenger.message(message) # get_element() # @@ -470,7 +471,7 @@ class Job: if envelope.message_type is _MessageType.LOG_MESSAGE: # Propagate received messages from children # back through the context. - self._scheduler.notify_messenger(envelope.message) + self._messenger.message(envelope.message) elif envelope.message_type is _MessageType.ERROR: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 986ac6c0a..9e444b393 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -352,7 +352,7 @@ class Queue: # a message for the element they are processing def _message(self, element, message_type, brief, **kwargs): message = Message(message_type, brief, element_name=element._get_full_name(), **kwargs) - self._scheduler.notify_messenger(message) + self._scheduler.context.messenger.message(message) def _element_log_path(self, element): project = element._get_project() diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 66174ad19..3e6bf1f92 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -46,59 +46,6 @@ class SchedStatus(FastEnum): TERMINATED = 1 -# NotificationType() -# -# Type of notification for inter-process communication -# between 'front' & 'back' end when a scheduler is executing. -# This is used as a parameter for a Notification object, -# to be used as a conditional for control or state handling. -# -class NotificationType(FastEnum): - INTERRUPT = "interrupt" - JOB_START = "job_start" - JOB_COMPLETE = "job_complete" - TICK = "tick" - TERMINATE = "terminate" - QUIT = "quit" - SCHED_START_TIME = "sched_start_time" - RUNNING = "running" - TERMINATED = "terminated" - SUSPEND = "suspend" - UNSUSPEND = "unsuspend" - SUSPENDED = "suspended" - RETRY = "retry" - MESSAGE = "message" - - -# Notification() -# -# An object to be passed across a bidirectional queue between -# Stream & Scheduler. A required NotificationType() parameter -# with accompanying information can be added as a member if -# required. NOTE: The notification object should be lightweight -# and all attributes must be picklable. -# -class Notification: - def __init__( - self, - notification_type, - *, - full_name=None, - job_action=None, - job_status=None, - time=None, - element=None, - message=None - ): - self.notification_type = notification_type - self.full_name = full_name - self.job_action = job_action - self.job_status = job_status - self.time = time - self.element = element - self.message = message - - # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -120,7 +67,7 @@ class Notification: # ticker_callback: A callback call once per second # class Scheduler: - def __init__(self, context, start_time, state, notification_queue, notifier): + def __init__(self, context, start_time, state, interrupt_callback, ticker_callback): # # Public members @@ -138,7 +85,6 @@ class Scheduler: # Private members # self._active_jobs = [] # Jobs currently being run in the scheduler - self._starttime = start_time # Initial application start time self._suspendtime = None # Session time compensation for suspended state self._queue_jobs = True # Whether we should continue to queue jobs self._state = state @@ -146,11 +92,11 @@ class Scheduler: self._sched_handle = None # Whether a scheduling job is already scheduled or not - # Bidirectional queue to send notifications back to the Scheduler's owner - self._notification_queue = notification_queue - self._notifier = notifier + self._ticker_callback = ticker_callback + self._interrupt_callback = interrupt_callback self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) + self._state.register_task_retry_callback(self._failure_retry) # run() # @@ -183,9 +129,6 @@ class Scheduler: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - # Notify that the loop has been created - self._notify(Notification(NotificationType.RUNNING)) - # Add timeouts self.loop.call_later(1, self._tick) @@ -221,9 +164,6 @@ class Scheduler: failed = any(queue.any_failed_elements() for queue in self.queues) self.loop = None - # Notify that the loop has been reset - self._notify(Notification(NotificationType.RUNNING)) - if failed: status = SchedStatus.ERROR elif self.terminated: @@ -247,7 +187,7 @@ class Scheduler: self.queues.clear() - # terminate_jobs() + # terminate() # # Forcefully terminates all ongoing jobs. # @@ -259,7 +199,7 @@ class Scheduler: # termination is not interrupted, and SIGINT will # remain blocked after Scheduler.run() returns. # - def terminate_jobs(self): + def terminate(self): # Set this right away, the frontend will check this # attribute to decide whether or not to print status info @@ -268,35 +208,34 @@ class Scheduler: # Notify the frontend that we're terminated as it might be # from an interactive prompt callback or SIGTERM - self._notify(Notification(NotificationType.TERMINATED)) self.loop.call_soon(self._terminate_jobs_real) # Block this until we're finished terminating jobs, # this will remain blocked forever. signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT]) - # jobs_suspended() + # suspend() # - # Suspend jobs after being notified + # Suspend the scheduler # - def jobs_suspended(self): + def suspend(self): self._disconnect_signals() self._suspend_jobs() - # jobs_unsuspended() + # resume() # - # Unsuspend jobs after being notified + # Restart the scheduler # - def jobs_unsuspended(self): + def resume(self): self._resume_jobs() self._connect_signals() - # stop_queueing() + # stop() # # Stop queueing additional jobs, causes Scheduler.run() # to return once all currently processing jobs are finished. # - def stop_queueing(self): + def stop(self): self._queue_jobs = False # job_completed(): @@ -312,7 +251,6 @@ class Scheduler: # Remove from the active jobs list self._active_jobs.remove(job) - element_info = None if status == JobStatus.FAIL: # If it's an elementjob, we want to compare against the failure messages # and send the unique_id and display key tuple of the Element. This can then @@ -323,27 +261,11 @@ class Scheduler: else: element_info = None - # Now check for more jobs - notification = Notification( - NotificationType.JOB_COMPLETE, - full_name=job.name, - job_action=job.action_name, - job_status=status, - element=element_info, - ) - self._notify(notification) - self._sched() + self._state.fail_task(job.action_name, job.name, element_info) - # notify_messenger() - # - # Send message over notification queue to Messenger callback - # - # Args: - # message (Message): A Message() to be sent to the frontend message - # handler, as assigned by context's messenger. - # - def notify_messenger(self, message): - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self._state.remove_task(job.action_name, job.name) + + self._sched() ####################################################### # Local Private Methods # @@ -362,10 +284,10 @@ class Scheduler: # def _abort_on_casd_failure(self, pid, returncode): message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self.context.messenger.message(message) self._casd_process.returncode = returncode - self.terminate_jobs() + self.terminate() # _start_job() # @@ -384,13 +306,7 @@ class Scheduler: self._active_jobs.append(job) job.start() - notification = Notification( - NotificationType.JOB_START, - full_name=job.name, - job_action=job.action_name, - time=self._state.elapsed_time(start_time=self._starttime), - ) - self._notify(notification) + self._state.add_task(job.action_name, job.name, self._state.elapsed_time()) # _sched_queue_jobs() # @@ -435,8 +351,8 @@ class Scheduler: # Make sure fork is allowed before starting jobs if not self.context.prepare_fork(): message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active") - self._notify(Notification(NotificationType.MESSAGE, message=message)) - self.terminate_jobs() + self.context.messenger.message(message) + self.terminate() return # Start the jobs @@ -497,7 +413,6 @@ class Scheduler: self._suspendtime = datetime.datetime.now() self.suspended = True # Notify that we're suspended - self._notify(Notification(NotificationType.SUSPENDED)) for job in self._active_jobs: job.suspend() @@ -511,9 +426,7 @@ class Scheduler: job.resume() self.suspended = False # Notify that we're unsuspended - self._notify(Notification(NotificationType.SUSPENDED)) - self._starttime += datetime.datetime.now() - self._suspendtime - self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) + self._state.offset_start_time(datetime.datetime.now() - self._suspendtime) self._suspendtime = None # _interrupt_event(): @@ -528,15 +441,14 @@ class Scheduler: if self.terminated: return - notification = Notification(NotificationType.INTERRUPT) - self._notify(notification) + self._interrupt_callback() # _terminate_event(): # # A loop registered event callback for SIGTERM # def _terminate_event(self): - self.terminate_jobs() + self.terminate() # _suspend_event(): # @@ -582,7 +494,7 @@ class Scheduler: # Regular timeout for driving status in the UI def _tick(self): - self._notify(Notification(NotificationType.TICK)) + self._ticker_callback() self.loop.call_later(1, self._tick) def _failure_retry(self, action_name, unique_id): @@ -597,28 +509,6 @@ class Scheduler: queue._task_group.failed_tasks.remove(element._get_full_name()) queue.enqueue([element]) - def _notify(self, notification): - # Scheduler to Stream notifcations on right side - self._notification_queue.append(notification) - self._notifier() - - def _stream_notification_handler(self): - notification = self._notification_queue.popleft() - if notification.notification_type == NotificationType.TERMINATE: - self.terminate_jobs() - elif notification.notification_type == NotificationType.QUIT: - self.stop_queueing() - elif notification.notification_type == NotificationType.SUSPEND: - self.jobs_suspended() - elif notification.notification_type == NotificationType.UNSUSPEND: - self.jobs_unsuspended() - elif notification.notification_type == NotificationType.RETRY: - self._failure_retry(notification.job_action, notification.element) - else: - # Do not raise exception once scheduler process is separated - # as we don't want to pickle exceptions between processes - raise ValueError("Unrecognised notification type received") - def _handle_exception(self, loop, context: dict) -> None: e = context.get("exception") exc = bool(e) diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index ec4f895fe..6e08c004d 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -112,6 +112,7 @@ class State: self._task_changed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] + self._task_retry_cbs = [] ##################################### # Frontend-facing notification APIs # @@ -226,6 +227,23 @@ class State: def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) + # register_task_retry_callback() + # + # Registers a callback to be notified when a task is to be retried + # + # Args: + # callback (function): The callback to be notified + # + # Callback Args: + # action_name (str): The name of the action, e.g. 'build' + # full_name (str): The full name of the task, distinguishing + # it from other tasks with the same action name + # e.g. an element's name. + # element_job (bool): (optionally) If an element job failed. + # + def register_task_retry_callback(self, callback): + self._task_retry_cbs.append(callback) + ############################################## # Core-facing APIs for driving notifications # ############################################## @@ -336,6 +354,20 @@ class State: for cb in self._task_failed_cbs: cb(action_name, full_name, element) + # retry_task() + # + # Notify all registered callbacks that a task is to be retried. + # + # This is a core-facing API and should not be called from the frontend + # + # Args: + # action_name: The name of the action, e.g. 'build' + # unique_id: The unique id of the plugin instance to look up + # + def retry_task(self, action_name: str, unique_id: str) -> None: + for cb in self._task_retry_cbs: + cb(action_name, unique_id) + # elapsed_time() # # Fetches the current session elapsed time @@ -354,6 +386,20 @@ class State: start_time = self._session_start or time_now return time_now - start_time + # offset_start_time() + # + # Update the 'start' time of the application by a given offset + # + # This allows modifying the time spent building when BuildStream + # gets paused then restarted, to give an accurate view of the real + # time spend building. + # + # Args: + # offset: the offset to add to the start time + # + def offset_start_time(self, offset: datetime.timedelta) -> None: + self._session_start += offset + # _Task # diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 989a00db7..cb1e84f74 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -43,9 +43,6 @@ from ._scheduler import ( BuildQueue, PullQueue, ArtifactPushQueue, - NotificationType, - Notification, - JobStatus, ) from .element import Element from ._pipeline import Pipeline @@ -91,20 +88,14 @@ class Stream: self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() - self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) - self._scheduler = Scheduler( - context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler - ) + self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback, ticker_callback) self._session_start_callback = session_start_callback - self._ticker_callback = ticker_callback - self._interrupt_callback = interrupt_callback - self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler - self._scheduler_running = False - self._scheduler_terminated = False - self._scheduler_suspended = False + self._running = False + self._terminated = False + self._suspended = False # init() # @@ -1064,7 +1055,7 @@ class Stream: # @property def running(self): - return self._scheduler_running + return self._running # suspended # @@ -1072,7 +1063,7 @@ class Stream: # @property def suspended(self): - return self._scheduler_suspended + return self._suspended # terminated # @@ -1080,23 +1071,15 @@ class Stream: # @property def terminated(self): - return self._scheduler_terminated - - # elapsed_time - # - # Elapsed time since the session start - # - @property - def elapsed_time(self): - return self._state.elapsed_time(start_time=self._starttime) + return self._terminated # terminate() # # Terminate jobs # def terminate(self): - notification = Notification(NotificationType.TERMINATE) - self._notify(notification) + self._scheduler.terminate() + self._terminated = True # quit() # @@ -1105,8 +1088,7 @@ class Stream: # of ongoing jobs # def quit(self): - notification = Notification(NotificationType.QUIT) - self._notify(notification) + self._scheduler.stop() # suspend() # @@ -1114,13 +1096,11 @@ class Stream: # @contextmanager def suspend(self): - # Send the notification to suspend jobs - notification = Notification(NotificationType.SUSPEND) - self._notify(notification) + self._scheduler.suspend() + self._suspended = True yield - # Unsuspend jobs on context exit - notification = Notification(NotificationType.UNSUSPEND) - self._notify(notification) + self._suspended = False + self._scheduler.resume() ############################################################# # Private Methods # @@ -1364,12 +1344,11 @@ class Stream: # failed task from the tasks group. # # Args: - # action_name (str): The name of the action being performed - # unique_id (str): A unique_id to load an Element instance + # action_name: The name of the action being performed + # unique_id: A unique_id to load an Element instance # - def _failure_retry(self, action_name, unique_id): - notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id) - self._notify(notification) + def _failure_retry(self, action_name: str, unique_id: str) -> None: + self._state.retry_task(action_name, unique_id) # _run() # @@ -1385,7 +1364,9 @@ class Stream: if self._session_start_callback is not None: self._session_start_callback() + self._running = True status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager()) + self._running = False if status == SchedStatus.ERROR: raise StreamError() @@ -1653,39 +1634,6 @@ class Stream: return element_targets, artifact_refs - def _scheduler_notification_handler(self): - # Check the queue is there - assert self._notification_queue - notification = self._notification_queue.pop() - - if notification.notification_type == NotificationType.MESSAGE: - self._context.messenger.message(notification.message) - elif notification.notification_type == NotificationType.INTERRUPT: - self._interrupt_callback() - elif notification.notification_type == NotificationType.TICK: - self._ticker_callback() - elif notification.notification_type == NotificationType.JOB_START: - self._state.add_task(notification.job_action, notification.full_name, notification.time) - elif notification.notification_type == NotificationType.JOB_COMPLETE: - self._state.remove_task(notification.job_action, notification.full_name) - if notification.job_status == JobStatus.FAIL: - self._state.fail_task(notification.job_action, notification.full_name, notification.element) - elif notification.notification_type == NotificationType.SCHED_START_TIME: - self._starttime = notification.time - elif notification.notification_type == NotificationType.RUNNING: - self._scheduler_running = not self._scheduler_running - elif notification.notification_type == NotificationType.TERMINATED: - self._scheduler_terminated = True - elif notification.notification_type == NotificationType.SUSPENDED: - self._scheduler_suspended = not self._scheduler_suspended - else: - raise StreamError("Unrecognised notification type received") - - def _notify(self, notification): - # Stream to scheduler notifcations on left side - self._notification_queue.appendleft(notification) - self._notifier() - # _handle_compression() # |