summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r--src/buildstream/_scheduler/scheduler.py83
1 files changed, 41 insertions, 42 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 86e3af021..171281bd9 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -76,17 +76,18 @@ class NotificationType(FastEnum):
# required. NOTE: The notification object should be lightweight
# and all attributes must be picklable.
#
-class Notification():
-
- def __init__(self,
- notification_type,
- *,
- full_name=None,
- job_action=None,
- job_status=None,
- time=None,
- element=None,
- message=None):
+class Notification:
+ def __init__(
+ self,
+ notification_type,
+ *,
+ full_name=None,
+ job_action=None,
+ job_status=None,
+ time=None,
+ element=None,
+ message=None
+ ):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -116,40 +117,36 @@ class Notification():
# interrupt_callback: A callback to handle ^C
# ticker_callback: A callback call once per second
#
-class Scheduler():
-
- def __init__(self, context,
- start_time, state, notification_queue, notifier):
+class Scheduler:
+ def __init__(self, context, start_time, state, notification_queue, notifier):
#
# Public members
#
- self.queues = None # Exposed for the frontend to print summaries
- self.context = context # The Context object shared with Queues
- self.terminated = False # Whether the scheduler was asked to terminate or has terminated
- self.suspended = False # Whether the scheduler is currently suspended
+ self.queues = None # Exposed for the frontend to print summaries
+ self.context = context # The Context object shared with Queues
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
+ self.suspended = False # Whether the scheduler is currently suspended
# These are shared with the Job, but should probably be removed or made private in some way.
- self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
+ self.loop = None # Shared for Job access to observe the message queue
+ self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
#
- self._active_jobs = [] # Jobs currently being run in the scheduler
- self._starttime = start_time # Initial application start time
- self._suspendtime = None # Session time compensation for suspended state
- self._queue_jobs = True # Whether we should continue to queue jobs
+ self._active_jobs = [] # Jobs currently being run in the scheduler
+ self._starttime = start_time # Initial application start time
+ self._suspendtime = None # Session time compensation for suspended state
+ self._queue_jobs = True # Whether we should continue to queue jobs
self._state = state
- self._casd_process = None # handle to the casd process for monitoring purpose
+ self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional queue to send notifications back to the Scheduler's owner
self._notification_queue = notification_queue
self._notifier = notifier
- self.resources = Resources(context.sched_builders,
- context.sched_fetchers,
- context.sched_pushers)
+ self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
# run()
#
@@ -310,11 +307,13 @@ class Scheduler():
element_info = None
# Now check for more jobs
- notification = Notification(NotificationType.JOB_COMPLETE,
- full_name=job.name,
- job_action=job.action_name,
- job_status=status,
- element=element_info)
+ notification = Notification(
+ NotificationType.JOB_COMPLETE,
+ full_name=job.name,
+ job_action=job.action_name,
+ job_status=status,
+ element=element_info,
+ )
self._notify(notification)
self._sched()
@@ -360,10 +359,12 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- notification = Notification(NotificationType.JOB_START,
- full_name=job.name,
- job_action=job.action_name,
- time=self._state.elapsed_time(start_time=self._starttime))
+ notification = Notification(
+ NotificationType.JOB_START,
+ full_name=job.name,
+ job_action=job.action_name,
+ time=self._state.elapsed_time(start_time=self._starttime),
+ )
self._notify(notification)
job.start()
@@ -399,9 +400,7 @@ class Scheduler():
# to fetch tasks for elements which failed to pull, and
# thus need all the pulls to complete before ever starting
# a build
- ready.extend(chain.from_iterable(
- q.harvest_jobs() for q in reversed(self.queues)
- ))
+ ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues)))
# harvest_jobs() may have decided to skip some jobs, making
# them eligible for promotion to the next queue as a side effect.
@@ -471,7 +470,7 @@ class Scheduler():
self.suspended = False
# Notify that we're unsuspended
self._notify(Notification(NotificationType.SUSPENDED))
- self._starttime += (datetime.datetime.now() - self._suspendtime)
+ self._starttime += datetime.datetime.now() - self._suspendtime
self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
self._suspendtime = None