# # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # import datetime from collections import OrderedDict # TaskGroup # # The state data stored for a group of tasks (usually scheduler queues) # # Args: # name (str): The name of the Task Group, e.g. 'build' # state (State): The state object # complete_name (str): Optional name for frontend status rendering, e.g. 'built' # class TaskGroup: def __init__(self, name, state, complete_name=None): self.name = name self.complete_name = complete_name self.processed_tasks = 0 self.skipped_tasks = 0 # NOTE: failed_tasks is a list of strings instead of an integer count # because the frontend requires the full list of failed tasks to # know whether to print failure messages for a given element. self.failed_tasks = [] self._state = state self._update_task_group_cbs = [] ########################################### # Core-facing APIs to drive notifications # ########################################### # add_processed_task() # # Update the TaskGroup's count of processed tasks and notify of changes # # This is a core-facing API and should not be called from the frontend # def add_processed_task(self): self.processed_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() # add_skipped_task() # # Update the TaskGroup's count of skipped tasks and notify of changes # # This is a core-facing API and should not be called from the frontend # def add_skipped_task(self): self.skipped_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() # add_failed_task() # # Update the TaskGroup's list of failed tasks and notify of changes # # Args: # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # # This is a core-facing API and should not be called from the frontend # def add_failed_task(self, full_name): self.failed_tasks.append(full_name) for cb in self._state._task_groups_changed_cbs: cb() # State # # The state data that is stored for the purpose of sharing with the frontend. # # BuildStream's Core is responsible for making changes to this data. # BuildStream's Frontend may register callbacks with State to be notified # when parts of State change, and read State to know what has changed. # # Args: # session_start (datetime): The time the session started # class State: def __init__(self, session_start): self._session_start = session_start self.task_groups = OrderedDict() # key is TaskGroup name # Note: A Task's full_name is technically unique, but only accidentally. self.tasks = OrderedDict() # key is a tuple of action_name and full_name self._task_added_cbs = [] self._task_removed_cbs = [] self._task_changed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] ##################################### # Frontend-facing notification APIs # ##################################### # register_task_added_callback() # # Registers a callback to be notified when a task is added # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def register_task_added_callback(self, callback): self._task_added_cbs.append(callback) # unregister_task_added_callback() # # Unregisters a callback previously registered by # register_task_added_callback() # # Args: # callback (function): The callback to be removed # def unregister_task_added_callback(self, callback): self._task_added_cbs.remove(callback) # register_task_removed_callback() # # Registers a callback to be notified when a task is removed # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def register_task_removed_callback(self, callback): self._task_removed_cbs.append(callback) # unregister_task_removed_callback() # # Unregisters a callback previously registered by # register_task_removed_callback() # # Args: # callback (function): The callback to be notified # def unregister_task_removed_callback(self, callback): self._task_removed_cbs.remove(callback) # register_task_changed_callback() # # Register a callback to be notified when a task has changed # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def register_task_changed_callback(self, callback): self._task_changed_cbs.append(callback) # unregister_task_changed_callback() # # Unregisters a callback previously registered by # register_task_changed_callback() # # Args: # callback (function): The callback to be notified # def unregister_task_changed_callback(self, callback): self._task_changed_cbs.remove(callback) # register_task_failed_callback() # # Registers a callback to be notified when a task has failed # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # element_job (bool): (optionally) If an element job failed. # def register_task_failed_callback(self, callback): self._task_failed_cbs.append(callback) # unregister_task_failed_callback() # # Unregisters a callback previously registered by # register_task_failed_callback() # # Args: # callback (function): The callback to be removed # def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) ############################################## # Core-facing APIs for driving notifications # ############################################## # add_task_group() # # Notification that a new task group has been added # # This is a core-facing API and should not be called from the frontend # # Args: # name (str): The name of the task group, e.g. 'build' # complete_name (str): Optional name to be used for frontend status rendering, e.g. 'built' # # Returns: # TaskGroup: The task group created # def add_task_group(self, name, complete_name=None): assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups) group = TaskGroup(name, self, complete_name) self.task_groups[name] = group return group # remove_task_group() # # Notification that a task group has been removed # # This is a core-facing API and should not be called from the frontend # # Args: # name (str): The name of the task group, e.g. 'build' # def remove_task_group(self, name): # Rely on 'del' to raise an error when removing nonexistent task groups del self.task_groups[name] # add_task() # # Add a task and send appropriate notifications # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): (Optional) The time the task started, relative # to buildstream's start time. Note scheduler tasks # use this as they don't report relative to wallclock time # if the Scheduler has been suspended. # def add_task(self, action_name, full_name, elapsed_offset=None): task_key = (action_name, full_name) assert task_key not in self.tasks, "Trying to add task '{}:{}' to '{}'".format( action_name, full_name, self.tasks ) if not elapsed_offset: elapsed_offset = self.elapsed_time() task = _Task(self, action_name, full_name, elapsed_offset) self.tasks[task_key] = task for cb in self._task_added_cbs: cb(action_name, full_name) return task # remove_task() # # Remove the task and send appropriate notifications # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def remove_task(self, action_name, full_name): # Rely on 'del' to raise an error when removing nonexistent tasks del self.tasks[(action_name, full_name)] for cb in self._task_removed_cbs: cb(action_name, full_name) # fail_task() # # Notify all registered callbacks that a task has failed. # # This is separate from the tasks changed callbacks because a failed task # requires the frontend to intervene to decide what happens next. # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # element (tuple): (optionally) The element unique_id and display keys if an # element job # def fail_task(self, action_name, full_name, element=None): for cb in self._task_failed_cbs: cb(action_name, full_name, element) # elapsed_time() # # Fetches the current session elapsed time # # Args: # start_time(time): Optional explicit start time, relative to caller. # # Returns: # (timedelta): The amount of time since the start of the session, # discounting any time spent while jobs were suspended if # start_time given relative to the Scheduler # def elapsed_time(self, start_time=None): time_now = datetime.datetime.now() if start_time is None: start_time = self._session_start or time_now return time_now - start_time # _Task # # The state data stored for an individual task # # Args: # state (State): The State object # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): The time the task started, relative to # buildstream's start time. class _Task: def __init__(self, state, action_name, full_name, elapsed_offset): self._state = state self.action_name = action_name self.full_name = full_name self.elapsed_offset = elapsed_offset self.current_progress = None self.maximum_progress = None self._render_cb = None # Callback to call when something could be rendered # set_render_cb() # # Sets the callback to be called when the Task has changed and should be rendered # # NOTE: This should probably be removed once the frontend is running # separately from the scheduler, since renders could be triggered # by the scheduler. def set_render_cb(self, callback): self._render_cb = callback def set_current_progress(self, progress): self.current_progress = progress for cb in self._state._task_changed_cbs: cb(self.action_name, self.full_name) if self._render_cb: self._render_cb() def set_maximum_progress(self, progress): self.maximum_progress = progress for cb in self._state._task_changed_cbs: cb(self.action_name, self.full_name) if self._render_cb: self._render_cb() def add_current_progress(self): if self.current_progress is None: new_progress = 1 else: new_progress = self.current_progress + 1 self.set_current_progress(new_progress) def add_maximum_progress(self): self.set_maximum_progress(self.maximum_progress or 0 + 1)