summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan van Berkom <tristan@codethink.co.uk>2020-12-09 18:30:44 +0900
committerTristan van Berkom <tristan@codethink.co.uk>2020-12-10 18:12:37 +0900
commit4c7656f1c53b1943896f70fb9a2e35e1f8a3fbd3 (patch)
tree302c3436b66cdb20ae8013bd2af537c67b036b0c
parentcbc8e78fed4caa6b0f2b6590d7d4025c46c0e500 (diff)
downloadbuildstream-4c7656f1c53b1943896f70fb9a2e35e1f8a3fbd3.tar.gz
_state.py: Full type hintingtristan/refactor-retry-task
This improves overall documentation comments on the State object, adds full pep484 type hinting, and renames the Task.set_render_cb() to Task.set_task_changed_callback() to be more consistently named. This also adds missing frontend facing API for the group changed status notifications, even though the frontend does not currently use these, it makes better sense to have them than to remove the entire codepaths and callback lists. This also reorders the classes in this file so that Task and TaskGroup are both defined before State, this helps a bit with undefined references for type hinting information.
-rw-r--r--src/buildstream/_messenger.py2
-rw-r--r--src/buildstream/_state.py344
2 files changed, 215 insertions, 131 deletions
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 69a309f91..f18d3dc92 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -212,7 +212,7 @@ class Messenger:
self.message(message)
task = self._state.add_task(task_name, activity_name, task_name)
- task.set_render_cb(self._render_status)
+ task.set_task_changed_callback(self._render_status)
self._active_simple_tasks += 1
if not self._next_render:
self._next_render = datetime.datetime.now() + _RENDER_INTERVAL
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 773aa2146..ddcbb09b9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -16,7 +16,8 @@
#
import datetime
-from collections import OrderedDict
+from typing import Optional, Tuple, List, Dict, Callable
+from .types import _DisplayKey
# TaskGroup
@@ -24,23 +25,27 @@ from collections import OrderedDict
# The state data stored for a group of tasks (usually scheduler queues)
#
# Args:
-# name (str): The name of the Task Group, e.g. 'build'
-# state (State): The state object
-# complete_name (str): Optional name for frontend status rendering, e.g. 'built'
+# name: The name of the Task Group, e.g. 'build'
+# state: The state object
+# complete_name: Optional name for frontend status rendering, e.g. 'built'
#
class TaskGroup:
- def __init__(self, name, state, complete_name=None):
- self.name = name
- self.complete_name = complete_name
- self.processed_tasks = 0
- self.skipped_tasks = 0
- # NOTE: failed_tasks is a list of strings instead of an integer count
- # because the frontend requires the full list of failed tasks to
- # know whether to print failure messages for a given element.
- self.failed_tasks = []
-
- self._state = state
- self._update_task_group_cbs = []
+ def __init__(self, name: str, state: "State", complete_name: Optional[str] = None) -> None:
+
+ #
+ # Public members
+ #
+ self.name: str = name # The name of tasks in this group
+ self.complete_name: Optional[str] = complete_name # Optional name for frontend status rendering, e.g. 'built'
+
+ self.processed_tasks: int = 0 # Number of processed tasks
+ self.skipped_tasks: int = 0 # Number of skipped tasks
+ self.failed_tasks: List[str] = [] # List of element full names which failed
+
+ #
+ # Private members
+ #
+ self._state: "State" = state
###########################################
# Core-facing APIs to drive notifications #
@@ -52,7 +57,7 @@ class TaskGroup:
#
# This is a core-facing API and should not be called from the frontend
#
- def add_processed_task(self):
+ def add_processed_task(self) -> None:
self.processed_tasks += 1
for cb in self._state._task_groups_changed_cbs:
cb()
@@ -63,7 +68,7 @@ class TaskGroup:
#
# This is a core-facing API and should not be called from the frontend
#
- def add_skipped_task(self):
+ def add_skipped_task(self) -> None:
self.skipped_tasks += 1
for cb in self._state._task_groups_changed_cbs:
@@ -74,19 +79,120 @@ class TaskGroup:
# Update the TaskGroup's list of failed tasks and notify of changes
#
# Args:
- # full_name (str): The full name of the task, distinguishing
- # it from other tasks with the same action name
- # e.g. an element's name.
+ # full_name: The full name of the task, distinguishing
+ # it from other tasks with the same action name
+ # e.g. an element's name.
#
# This is a core-facing API and should not be called from the frontend
#
- def add_failed_task(self, full_name):
+ def add_failed_task(self, full_name: str) -> None:
self.failed_tasks.append(full_name)
for cb in self._state._task_groups_changed_cbs:
cb()
+# Task
+#
+# The state data stored for an individual task
+#
+# Args:
+# state: The State object
+# task_id: The unique identifier of the task
+# action_name: The name of the action, e.g. 'build'
+# full_name: The full name of the task, distinguishing
+# it from other tasks with the same action name
+# e.g. an element's name.
+# elapsed_offset: The time the task started, relative to
+# buildstream's start time.
+class Task:
+ def __init__(
+ self, state: "State", task_id: str, action_name: str, full_name: str, elapsed_offset: datetime.timedelta
+ ) -> None:
+
+ #
+ # Public members
+ #
+ self.id: str = task_id
+ self.action_name: str = action_name
+ self.full_name: str = full_name
+ self.elapsed_offset: datetime.timedelta = elapsed_offset
+ self.current_progress: Optional[int] = None
+ self.maximum_progress: Optional[int] = None
+
+ #
+ # Private members
+ #
+ self._state: "State" = state
+ self._task_changed_cb: Optional[Callable[[], None]] = None # Callback to call when something could be rendered
+
+ ##############################################
+ # Core-facing APIs for driving notifications #
+ ##############################################
+
+ # set_task_changed_callback()
+ #
+ # Sets the callback to be called when this task has
+ # changed.
+ #
+ # This is just a convenience codepath for the Messenger object
+ # run simple tasks outside of the scheduler context, rather
+ # than connecting to the State callbacks which are there for the
+ # purpose of the frontend to get notifications of task progress.
+ #
+ # Args:
+ # callback: The callback to call when progress changed
+ #
+ def set_task_changed_callback(self, callback: Optional[Callable[[], None]]) -> None:
+ self._task_changed_cb = callback
+
+ # set_maximum_progress()
+ #
+ # Sets the maximum progress possible for this task.
+ #
+ # Args:
+ # progress: The maximum progress possible for this task
+ #
+ def set_maximum_progress(self, progress: int) -> None:
+ self.maximum_progress = progress
+ self._notify_task_changed()
+
+ # set_current_progress()
+ #
+ # Sets the current progress of the task, this should
+ # be a number between 0 and the maximum progress, if a
+ # maximum progress has been set.
+ #
+ # Args:
+ # progress: The current progress
+ #
+ def set_current_progress(self, progress: int) -> None:
+ self.current_progress = progress
+ self._notify_task_changed()
+
+ # add_current_progress()
+ #
+ # A convenience function for incrementing the current
+ # progress of this task by 1.
+ #
+ def add_current_progress(self) -> None:
+ if self.current_progress is None:
+ new_progress = 1
+ else:
+ new_progress = self.current_progress + 1
+ self.set_current_progress(new_progress)
+
+ ##############################################
+ # Private methods #
+ ##############################################
+ def _notify_task_changed(self) -> None:
+ for cb in self._state._task_changed_cbs:
+ cb(self.id)
+
+ if self._task_changed_cb:
+ self._task_changed_cb()
+
+
# State
#
# The state data that is stored for the purpose of sharing with the frontend.
@@ -96,22 +202,26 @@ class TaskGroup:
# when parts of State change, and read State to know what has changed.
#
# Args:
-# session_start (datetime): The time the session started
+# session_start: The time the session started
#
class State:
- def __init__(self, session_start):
- self._session_start = session_start
-
- self.task_groups = OrderedDict() # key is TaskGroup name
-
- # Note: A Task's full_name is technically unique, but only accidentally.
- self.tasks = OrderedDict() # key is a tuple of action_name and full_name
-
- self._task_added_cbs = []
- self._task_removed_cbs = []
- self._task_changed_cbs = []
- self._task_groups_changed_cbs = []
- self._task_failed_cbs = []
+ def __init__(self, session_start: datetime.datetime) -> None:
+
+ #
+ # Public members
+ #
+ self.task_groups: Dict[str, TaskGroup] = {} # Dictionary of active task groups by group name
+ self.tasks: Dict[str, Task] = {} # Dictionary of active tasks by unique task ID
+
+ #
+ # Private members
+ #
+ self._session_start: datetime.datetime = session_start
+ self._task_added_cbs: List[Callable[[str], None]] = []
+ self._task_removed_cbs: List[Callable[[str], None]] = []
+ self._task_changed_cbs: List[Callable[[str], None]] = []
+ self._task_failed_cbs: List[Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]] = []
+ self._task_groups_changed_cbs: List[Callable[[], None]] = []
#####################################
# Frontend-facing notification APIs #
@@ -122,12 +232,12 @@ class State:
# Registers a callback to be notified when a task is added
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_added_callback(self, callback):
+ def register_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.append(callback)
# unregister_task_added_callback()
@@ -136,9 +246,9 @@ class State:
# register_task_added_callback()
#
# Args:
- # callback (function): The callback to be removed
+ # callback: The callback to be removed
#
- def unregister_task_added_callback(self, callback):
+ def unregister_task_added_callback(self, callback: Callable[[str], None]) -> None:
self._task_added_cbs.remove(callback)
# register_task_removed_callback()
@@ -146,12 +256,12 @@ class State:
# Registers a callback to be notified when a task is removed
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_removed_callback(self, callback):
+ def register_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.append(callback)
# unregister_task_removed_callback()
@@ -160,9 +270,9 @@ class State:
# register_task_removed_callback()
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
- def unregister_task_removed_callback(self, callback):
+ def unregister_task_removed_callback(self, callback: Callable[[str], None]) -> None:
self._task_removed_cbs.remove(callback)
# register_task_changed_callback()
@@ -170,12 +280,12 @@ class State:
# Register a callback to be notified when a task has changed
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def register_task_changed_callback(self, callback):
+ def register_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.append(callback)
# unregister_task_changed_callback()
@@ -184,9 +294,9 @@ class State:
# register_task_changed_callback()
#
# Args:
- # callback (function): The callback to be notified
+ # callback: The callback to be notified
#
- def unregister_task_changed_callback(self, callback):
+ def unregister_task_changed_callback(self, callback: Callable[[str], None]) -> None:
self._task_changed_cbs.remove(callback)
# register_task_failed_callback()
@@ -197,11 +307,12 @@ class State:
# callback (function): The callback to be notified
#
# Callback Args:
- # task_id (str): The unique identifier of the task
- # element (tuple): (optionally) The element unique_id and display keys if an
- # element job
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and DisplayKey of an element job
#
- def register_task_failed_callback(self, callback):
+ def register_task_failed_callback(
+ self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
+ ) -> None:
self._task_failed_cbs.append(callback)
# unregister_task_failed_callback()
@@ -212,9 +323,35 @@ class State:
# Args:
# callback (function): The callback to be removed
#
- def unregister_task_failed_callback(self, callback):
+ def unregister_task_failed_callback(
+ self, callback: Callable[[str, Optional[Tuple[int, _DisplayKey]]], None]
+ ) -> None:
self._task_failed_cbs.remove(callback)
+ # register_task_groups_changed_callback()
+ #
+ # Registers a callback to be notified whenever the task groups info has changed
+ #
+ # Args:
+ # callback: The callback to be notified
+ #
+ # Callback Args:
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and DisplayKey of an element job
+ #
+ def register_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
+ self._task_groups_changed_cbs.append(callback)
+
+ # unregister_task_groups_changed_callback()
+ #
+ # Unregisters a callback previously registered by register_task_groups_changed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
+ #
+ def unregister_task_groups_changed_callback(self, callback: Callable[[], None]) -> None:
+ self._task_groups_changed_cbs.remove(callback)
+
##############################################
# Core-facing APIs for driving notifications #
##############################################
@@ -232,7 +369,7 @@ class State:
# Returns:
# TaskGroup: The task group created
#
- def add_task_group(self, name, complete_name=None):
+ def add_task_group(self, name, complete_name=None) -> TaskGroup:
assert name not in self.task_groups, "Trying to add task group '{}' to '{}'".format(name, self.task_groups)
group = TaskGroup(name, self, complete_name)
self.task_groups[name] = group
@@ -248,7 +385,7 @@ class State:
# Args:
# name (str): The name of the task group, e.g. 'build'
#
- def remove_task_group(self, name):
+ def remove_task_group(self, name) -> None:
# Rely on 'del' to raise an error when removing nonexistent task groups
del self.task_groups[name]
@@ -259,9 +396,9 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
- # action_name (str): The name of the action, e.g. 'build'
- # full_name (str): The full name of the task, distinguishing
+ # task_id: The unique identifier of the task
+ # action_name: The name of the action, e.g. 'build'
+ # full_name: The full name of the task, distinguishing
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
@@ -269,7 +406,12 @@ class State:
# use this as they don't report relative to wallclock time
# if the Scheduler has been suspended.
#
- def add_task(self, task_id, action_name, full_name, elapsed_offset=None):
+ # Returns:
+ # The new task
+ #
+ def add_task(
+ self, task_id: str, action_name: str, full_name: str, elapsed_offset: Optional[datetime.timedelta] = None
+ ) -> Task:
assert task_id not in self.tasks, "Trying to add task '{}:{}' with ID '{}' to '{}'".format(
action_name, full_name, task_id, self.tasks
)
@@ -292,9 +434,9 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
+ # task_id: The unique identifier of the task
#
- def remove_task(self, task_id):
+ def remove_task(self, task_id: str) -> None:
# Rely on 'del' to raise an error when removing nonexistent tasks
del self.tasks[task_id]
@@ -311,11 +453,11 @@ class State:
# This is a core-facing API and should not be called from the frontend
#
# Args:
- # task_id (str): The unique identifier of the task
- # element (tuple): (optionally) The element unique_id and display keys if an
+ # task_id: The unique identifier of the task
+ # element: (optionally) The element unique_id and display keys if an
# element job
#
- def fail_task(self, task_id, element=None):
+ def fail_task(self, task_id: str, element: Optional[Tuple[int, _DisplayKey]] = None) -> None:
for cb in self._task_failed_cbs:
cb(task_id, element)
@@ -324,14 +466,14 @@ class State:
# Fetches the current session elapsed time
#
# Args:
- # start_time(time): Optional explicit start time, relative to caller.
+ # start_time: Optional explicit start time, relative to caller.
#
# Returns:
- # (timedelta): The amount of time since the start of the session,
- # discounting any time spent while jobs were suspended if
- # start_time given relative to the Scheduler
+ # The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended if
+ # start_time given relative to the Scheduler
#
- def elapsed_time(self, start_time=None):
+ def elapsed_time(self, start_time: Optional[datetime.datetime] = None) -> datetime.timedelta:
time_now = datetime.datetime.now()
if start_time is None:
start_time = self._session_start or time_now
@@ -350,61 +492,3 @@ class State:
#
def offset_start_time(self, offset: datetime.timedelta) -> None:
self._session_start += offset
-
-
-# Task
-#
-# The state data stored for an individual task
-#
-# Args:
-# state (State): The State object
-# task_id (str): The unique identifier of the task
-# action_name (str): The name of the action, e.g. 'build'
-# full_name (str): The full name of the task, distinguishing
-# it from other tasks with the same action name
-# e.g. an element's name.
-# elapsed_offset (timedelta): The time the task started, relative to
-# buildstream's start time.
-class Task:
- def __init__(self, state, task_id, action_name, full_name, elapsed_offset):
- self._state = state
- self.id = task_id
- self.action_name = action_name
- self.full_name = full_name
- self.elapsed_offset = elapsed_offset
- self.current_progress = None
- self.maximum_progress = None
-
- self._render_cb = None # Callback to call when something could be rendered
-
- # set_render_cb()
- #
- # Sets the callback to be called when the Task has changed and should be rendered
- #
- # NOTE: This should probably be removed once the frontend is running
- # separately from the scheduler, since renders could be triggered
- # by the scheduler.
- def set_render_cb(self, callback):
- self._render_cb = callback
-
- def set_current_progress(self, progress):
- self.current_progress = progress
- for cb in self._state._task_changed_cbs:
- cb(self.id)
- if self._render_cb:
- self._render_cb()
-
- def set_maximum_progress(self, progress):
- self.maximum_progress = progress
- for cb in self._state._task_changed_cbs:
- cb(self.id)
-
- if self._render_cb:
- self._render_cb()
-
- def add_current_progress(self):
- if self.current_progress is None:
- new_progress = 1
- else:
- new_progress = self.current_progress + 1
- self.set_current_progress(new_progress)