diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-08-20 16:36:53 +0100 |
---|---|---|
committer | bst-marge-bot <marge-bot@buildstream.build> | 2019-09-10 10:44:53 +0000 |
commit | be757fab5e8dc3d603e8f61e6ffbbf00053467a6 (patch) | |
tree | 464d78dc7f3e89af0c01b519850c0a77e8c6f6be | |
parent | 56365356a20540c3319107fe647d0852321a64cb (diff) | |
download | buildstream-be757fab5e8dc3d603e8f61e6ffbbf00053467a6.tar.gz |
scheduler.py: Move elapsed_time() to shared State()
The starttime relative to the scheduler, used to track deltas for
time spent not suspended across Stream--Scheduler is now synced via
a notification. This removes the need to call into the scheduler from
the 'frontend' to determine the time difference for scheduler relative
queries.
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 23 | ||||
-rw-r--r-- | src/buildstream/_state.py | 24 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 7 |
3 files changed, 32 insertions, 22 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index e6a12e81c..7ac0559b7 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -54,6 +54,7 @@ class NotificationType(FastEnum): TICK = "tick" TERMINATE = "terminate" QUIT = "quit" + SCHED_START_TIME = "sched_start_time" # Notification() @@ -72,13 +73,13 @@ class Notification(): full_name=None, job_action=None, job_status=None, - elapsed_time=None, + time=None, element=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action self.job_status = job_status - self.elapsed_time = elapsed_time + self.time = time self.element = element @@ -249,21 +250,6 @@ class Scheduler(): def stop_queueing(self): self._queue_jobs = False - # elapsed_time() - # - # Fetches the current session elapsed time - # - # Returns: - # (timedelta): The amount of time since the start of the session, - # discounting any time spent while jobs were suspended. - # - def elapsed_time(self): - timenow = datetime.datetime.now() - starttime = self._starttime - if not starttime: - starttime = timenow - return timenow - starttime - # job_completed(): # # Called when a Job completes @@ -313,7 +299,7 @@ class Scheduler(): notification = Notification(NotificationType.JOB_START, full_name=job.name, job_action=job.action_name, - elapsed_time=self.elapsed_time()) + time=self._state.elapsed_time(start_time=self._starttime)) self._notify(notification) job.start() @@ -411,6 +397,7 @@ class Scheduler(): job.resume() self.suspended = False self._starttime += (datetime.datetime.now() - self._suspendtime) + self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None # _interrupt_event(): diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py index df3bceff2..c99434018 100644 --- a/src/buildstream/_state.py +++ b/src/buildstream/_state.py @@ -272,7 +272,9 @@ class State(): # it from other tasks with the same action name # e.g. an element's name. # elapsed_offset (timedelta): (Optional) The time the task started, relative - # to buildstream's start time. + # to buildstream's start time. Note scheduler tasks + # use this as they don't report relative to wallclock time + # if the Scheduler has been suspended. # def add_task(self, action_name, full_name, elapsed_offset=None): task_key = (action_name, full_name) @@ -280,7 +282,7 @@ class State(): "Trying to add task '{}:{}' to '{}'".format(action_name, full_name, self.tasks) if not elapsed_offset: - elapsed_offset = datetime.datetime.now() - self._session_start + elapsed_offset = self.elapsed_time() task = _Task(self, action_name, full_name, elapsed_offset) self.tasks[task_key] = task @@ -330,6 +332,24 @@ class State(): for cb in self._task_failed_cbs: cb(action_name, full_name, element) + # elapsed_time() + # + # Fetches the current session elapsed time + # + # Args: + # start_time(time): Optional explicit start time, relative to caller. + # + # Returns: + # (timedelta): The amount of time since the start of the session, + # discounting any time spent while jobs were suspended if + # start_time given relative to the Scheduler + # + def elapsed_time(self, start_time=None): + time_now = datetime.datetime.now() + if start_time is None: + start_time = self._session_start or time_now + return time_now - start_time + # _Task # diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index bbe60c1ec..8f104a226 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -80,6 +80,7 @@ class Stream(): self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state self._notification_queue = deque() + self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) @@ -1100,7 +1101,7 @@ class Stream(): # @property def elapsed_time(self): - return self._scheduler.elapsed_time() + return self._state.elapsed_time(start_time=self._starttime) # terminate() # @@ -1658,12 +1659,14 @@ class Stream(): elif notification.notification_type == NotificationType.TICK: self._ticker_callback() elif notification.notification_type == NotificationType.JOB_START: - self._state.add_task(notification.job_action, notification.full_name, notification.elapsed_time) + self._state.add_task(notification.job_action, notification.full_name, notification.time) elif notification.notification_type == NotificationType.JOB_COMPLETE: self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: self._state.fail_task(notification.job_action, notification.full_name, notification.element) + elif notification.notification_type == NotificationType.SCHED_START_TIME: + self._starttime = notification.time else: raise StreamError("Unrecognised notification type received") |