diff options
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 86e3af021..171281bd9 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -76,17 +76,18 @@ class NotificationType(FastEnum): # required. NOTE: The notification object should be lightweight # and all attributes must be picklable. # -class Notification(): - - def __init__(self, - notification_type, - *, - full_name=None, - job_action=None, - job_status=None, - time=None, - element=None, - message=None): +class Notification: + def __init__( + self, + notification_type, + *, + full_name=None, + job_action=None, + job_status=None, + time=None, + element=None, + message=None + ): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -116,40 +117,36 @@ class Notification(): # interrupt_callback: A callback to handle ^C # ticker_callback: A callback call once per second # -class Scheduler(): - - def __init__(self, context, - start_time, state, notification_queue, notifier): +class Scheduler: + def __init__(self, context, start_time, state, notification_queue, notifier): # # Public members # - self.queues = None # Exposed for the frontend to print summaries - self.context = context # The Context object shared with Queues - self.terminated = False # Whether the scheduler was asked to terminate or has terminated - self.suspended = False # Whether the scheduler is currently suspended + self.queues = None # Exposed for the frontend to print summaries + self.context = context # The Context object shared with Queues + self.terminated = False # Whether the scheduler was asked to terminate or has terminated + self.suspended = False # Whether the scheduler is currently suspended # These are shared with the Job, but should probably be removed or made private in some way. - self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py + self.loop = None # Shared for Job access to observe the message queue + self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members # - self._active_jobs = [] # Jobs currently being run in the scheduler - self._starttime = start_time # Initial application start time - self._suspendtime = None # Session time compensation for suspended state - self._queue_jobs = True # Whether we should continue to queue jobs + self._active_jobs = [] # Jobs currently being run in the scheduler + self._starttime = start_time # Initial application start time + self._suspendtime = None # Session time compensation for suspended state + self._queue_jobs = True # Whether we should continue to queue jobs self._state = state - self._casd_process = None # handle to the casd process for monitoring purpose + self._casd_process = None # handle to the casd process for monitoring purpose # Bidirectional queue to send notifications back to the Scheduler's owner self._notification_queue = notification_queue self._notifier = notifier - self.resources = Resources(context.sched_builders, - context.sched_fetchers, - context.sched_pushers) + self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) # run() # @@ -310,11 +307,13 @@ class Scheduler(): element_info = None # Now check for more jobs - notification = Notification(NotificationType.JOB_COMPLETE, - full_name=job.name, - job_action=job.action_name, - job_status=status, - element=element_info) + notification = Notification( + NotificationType.JOB_COMPLETE, + full_name=job.name, + job_action=job.action_name, + job_status=status, + element=element_info, + ) self._notify(notification) self._sched() @@ -360,10 +359,12 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - notification = Notification(NotificationType.JOB_START, - full_name=job.name, - job_action=job.action_name, - time=self._state.elapsed_time(start_time=self._starttime)) + notification = Notification( + NotificationType.JOB_START, + full_name=job.name, + job_action=job.action_name, + time=self._state.elapsed_time(start_time=self._starttime), + ) self._notify(notification) job.start() @@ -399,9 +400,7 @@ class Scheduler(): # to fetch tasks for elements which failed to pull, and # thus need all the pulls to complete before ever starting # a build - ready.extend(chain.from_iterable( - q.harvest_jobs() for q in reversed(self.queues) - )) + ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues))) # harvest_jobs() may have decided to skip some jobs, making # them eligible for promotion to the next queue as a side effect. @@ -471,7 +470,7 @@ class Scheduler(): self.suspended = False # Notify that we're unsuspended self._notify(Notification(NotificationType.SUSPENDED)) - self._starttime += (datetime.datetime.now() - self._suspendtime) + self._starttime += datetime.datetime.now() - self._suspendtime self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime)) self._suspendtime = None |