summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-08-20 16:36:53 +0100
committerbst-marge-bot <marge-bot@buildstream.build>2019-09-10 10:44:53 +0000
commitbe757fab5e8dc3d603e8f61e6ffbbf00053467a6 (patch)
tree464d78dc7f3e89af0c01b519850c0a77e8c6f6be
parent56365356a20540c3319107fe647d0852321a64cb (diff)
downloadbuildstream-be757fab5e8dc3d603e8f61e6ffbbf00053467a6.tar.gz
scheduler.py: Move elapsed_time() to shared State()
The starttime relative to the scheduler, used to track deltas for time spent not suspended across Stream--Scheduler is now synced via a notification. This removes the need to call into the scheduler from the 'frontend' to determine the time difference for scheduler relative queries.
-rw-r--r--src/buildstream/_scheduler/scheduler.py23
-rw-r--r--src/buildstream/_state.py24
-rw-r--r--src/buildstream/_stream.py7
3 files changed, 32 insertions, 22 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e6a12e81c..7ac0559b7 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -54,6 +54,7 @@ class NotificationType(FastEnum):
TICK = "tick"
TERMINATE = "terminate"
QUIT = "quit"
+ SCHED_START_TIME = "sched_start_time"
# Notification()
@@ -72,13 +73,13 @@ class Notification():
full_name=None,
job_action=None,
job_status=None,
- elapsed_time=None,
+ time=None,
element=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
self.job_status = job_status
- self.elapsed_time = elapsed_time
+ self.time = time
self.element = element
@@ -249,21 +250,6 @@ class Scheduler():
def stop_queueing(self):
self._queue_jobs = False
- # elapsed_time()
- #
- # Fetches the current session elapsed time
- #
- # Returns:
- # (timedelta): The amount of time since the start of the session,
- # discounting any time spent while jobs were suspended.
- #
- def elapsed_time(self):
- timenow = datetime.datetime.now()
- starttime = self._starttime
- if not starttime:
- starttime = timenow
- return timenow - starttime
-
# job_completed():
#
# Called when a Job completes
@@ -313,7 +299,7 @@ class Scheduler():
notification = Notification(NotificationType.JOB_START,
full_name=job.name,
job_action=job.action_name,
- elapsed_time=self.elapsed_time())
+ time=self._state.elapsed_time(start_time=self._starttime))
self._notify(notification)
job.start()
@@ -411,6 +397,7 @@ class Scheduler():
job.resume()
self.suspended = False
self._starttime += (datetime.datetime.now() - self._suspendtime)
+ self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
self._suspendtime = None
# _interrupt_event():
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index df3bceff2..c99434018 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -272,7 +272,9 @@ class State():
# it from other tasks with the same action name
# e.g. an element's name.
# elapsed_offset (timedelta): (Optional) The time the task started, relative
- # to buildstream's start time.
+ # to buildstream's start time. Note scheduler tasks
+ # use this as they don't report relative to wallclock time
+ # if the Scheduler has been suspended.
#
def add_task(self, action_name, full_name, elapsed_offset=None):
task_key = (action_name, full_name)
@@ -280,7 +282,7 @@ class State():
"Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks)
if not elapsed_offset:
- elapsed_offset = datetime.datetime.now() - self._session_start
+ elapsed_offset = self.elapsed_time()
task = _Task(self, action_name, full_name, elapsed_offset)
self.tasks[task_key] = task
@@ -330,6 +332,24 @@ class State():
for cb in self._task_failed_cbs:
cb(action_name, full_name, element)
+ # elapsed_time()
+ #
+ # Fetches the current session elapsed time
+ #
+ # Args:
+ # start_time(time): Optional explicit start time, relative to caller.
+ #
+ # Returns:
+ # (timedelta): The amount of time since the start of the session,
+ # discounting any time spent while jobs were suspended if
+ # start_time given relative to the Scheduler
+ #
+ def elapsed_time(self, start_time=None):
+ time_now = datetime.datetime.now()
+ if start_time is None:
+ start_time = self._session_start or time_now
+ return time_now - start_time
+
# _Task
#
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index bbe60c1ec..8f104a226 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -80,6 +80,7 @@ class Stream():
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
self._notification_queue = deque()
+ self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
@@ -1100,7 +1101,7 @@ class Stream():
#
@property
def elapsed_time(self):
- return self._scheduler.elapsed_time()
+ return self._state.elapsed_time(start_time=self._starttime)
# terminate()
#
@@ -1658,12 +1659,14 @@ class Stream():
elif notification.notification_type == NotificationType.TICK:
self._ticker_callback()
elif notification.notification_type == NotificationType.JOB_START:
- self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time)
+ self._state.add_task(notification.job_action, notification.full_name, notification.time)
elif notification.notification_type == NotificationType.JOB_COMPLETE:
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
self._state.fail_task(notification.job_action, notification.full_name,
notification.element)
+ elif notification.notification_type == NotificationType.SCHED_START_TIME:
+ self._starttime = notification.time
else:
raise StreamError("Unrecognised notification type received")