# # Copyright (C) 2019 Bloomberg Finance LP # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # from collections import OrderedDict # TaskGroup # # The state data stored for a group of tasks (usually scheduler queues) # # Args: # name (str): The name of the Task Group, e.g. 'build' # state (State): The state object # class TaskGroup(): def __init__(self, name, state): self.name = name self.processed_tasks = 0 self.skipped_tasks = 0 # NOTE: failed_tasks is a list of strings instead of an integer count # because the frontend requires the full list of failed tasks to # know whether to print failure messages for a given element. self.failed_tasks = [] self._state = state self._update_task_group_cbs = [] ########################################### # Core-facing APIs to drive notifications # ########################################### # add_processed_task() # # Update the TaskGroup's count of processed tasks and notify of changes # # This is a core-facing API and should not be called from the frontend # def add_processed_task(self): self.processed_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() # add_skipped_task() # # Update the TaskGroup's count of skipped tasks and notify of changes # # This is a core-facing API and should not be called from the frontend # def add_skipped_task(self): self.skipped_tasks += 1 for cb in self._state._task_groups_changed_cbs: cb() # add_failed_task() # # Update the TaskGroup's list of failed tasks and notify of changes # # Args: # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # # This is a core-facing API and should not be called from the frontend # def add_failed_task(self, full_name): self.failed_tasks.append(full_name) for cb in self._state._task_groups_changed_cbs: cb() # State # # The state data that is stored for the purpose of sharing with the frontend. # # BuildStream's Core is responsible for making changes to this data. # BuildStream's Frontend may register callbacks with State to be notified # when parts of State change, and read State to know what has changed. class State(): def __init__(self): self.task_groups = OrderedDict() # key is TaskGroup name # Note: A Task's full_name is technically unique, but only accidentally. self.tasks = OrderedDict() # key is a tuple of action_name and full_name self._task_added_cbs = [] self._task_removed_cbs = [] self._task_groups_changed_cbs = [] self._task_failed_cbs = [] ##################################### # Frontend-facing notification APIs # ##################################### # register_task_added_callback() # # Registers a callback to be notified when a task is added # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def register_task_added_callback(self, callback): self._task_added_cbs.append(callback) # unregister_task_added_callback() # # Unregisters a callback previously registered by # register_task_added_callback() # # Args: # callback (function): The callback to be removed # def unregister_task_added_callback(self, callback): self._task_added_cbs.remove(callback) # register_task_removed_callback() # # Registers a callback to be notified when a task is removed # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def register_task_removed_callback(self, callback): self._task_removed_cbs.append(callback) # unregister_task_removed_callback() # # Unregisters a callback previously registered by # register_task_removed_callback() # # Args: # callback (function): The callback to be notified # def unregister_task_removed_callback(self, callback): self._task_removed_cbs.remove(callback) # register_task_failed_callback() # # Registers a callback to be notified when a task has failed # # Args: # callback (function): The callback to be notified # # Callback Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # unique_id (int): (optionally) the element's unique ID, if the failure # came from an element # def register_task_failed_callback(self, callback): self._task_failed_cbs.append(callback) # unregister_task_failed_callback() # # Unregisters a callback previously registered by # register_task_failed_callback() # # Args: # callback (function): The callback to be removed # def unregister_task_failed_callback(self, callback): self._task_failed_cbs.remove(callback) ############################################## # Core-facing APIs for driving notifications # ############################################## # add_task_group() # # Notification that a new task group has been added # # This is a core-facing API and should not be called from the frontend # # Args: # name (str): The name of the task group, e.g. 'build' # # Returns: # TaskGroup: The task group created # def add_task_group(self, name): assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups) group = TaskGroup(name, self) self.task_groups[name] = group return group # remove_task_group() # # Notification that a task group has been removed # # This is a core-facing API and should not be called from the frontend # # Args: # name (str): The name of the task group, e.g. 'build' # def remove_task_group(self, name): # Rely on 'del' to raise an error when removing nonexistent task groups del self.task_groups[name] # add_task() # # Add a task and send appropriate notifications # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # start_time (timedelta): The time the task started, relative to # buildstream's start time. # def add_task(self, action_name, full_name, start_time): task_key = (action_name, full_name) assert task_key not in self.tasks, \ "Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks) task = _Task(action_name, full_name, start_time) self.tasks[task_key] = task for cb in self._task_added_cbs: cb(action_name, full_name) # remove_task() # # Remove the task and send appropriate notifications # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # def remove_task(self, action_name, full_name): # Rely on 'del' to raise an error when removing nonexistent tasks del self.tasks[(action_name, full_name)] for cb in self._task_removed_cbs: cb(action_name, full_name) # fail_task() # # Notify all registered callbacks that a task has failed. # # This is separate from the tasks changed callbacks because a failed task # requires the frontend to intervene to decide what happens next. # # This is a core-facing API and should not be called from the frontend # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # unique_id (int): (optionally) the element's unique ID, if the failure came from an element # def fail_task(self, action_name, full_name, unique_id=None): for cb in self._task_failed_cbs: cb(action_name, full_name, unique_id) # _Task # # The state data stored for an individual task # # Args: # action_name (str): The name of the action, e.g. 'build' # full_name (str): The full name of the task, distinguishing # it from other tasks with the same action name # e.g. an element's name. # start_time (timedelta): The time the task started, relative to # buildstream's start time. class _Task(): def __init__(self, action_name, full_name, start_time): self.action_name = action_name self.full_name = full_name self.start_time = start_time