summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r--src/buildstream/_scheduler/scheduler.py30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 17d655ce2..d2ea2741e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
from .._profile import Topics, PROFILER
@@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size'
# Args:
# context: The Context in the parent scheduling process
# start_time: The time at which the session started
+# state: The state that can be made available to the frontend
# interrupt_callback: A callback to handle ^C
# ticker_callback: A callback call once per second
-# job_start_callback: A callback call when each job starts
-# job_complete_callback: A callback call when each job completes
#
class Scheduler():
def __init__(self, context,
- start_time,
+ start_time, state,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -94,6 +91,7 @@ class Scheduler():
self._starttime = start_time # Initial application start time
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
+ self._state = state
# State of cache management related jobs
self._cache_size_scheduled = False # Whether we have a cache size job scheduled
@@ -104,8 +102,6 @@ class Scheduler():
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
- self._job_start_callback = job_start_callback
- self._job_complete_callback = job_complete_callback
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -163,7 +159,7 @@ class Scheduler():
# Stop handling unix signals
self._disconnect_signals()
- failed = any(any(queue.failed_elements) for queue in self.queues)
+ failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
if failed:
@@ -184,6 +180,9 @@ class Scheduler():
#
def clear_queues(self):
if self.queues:
+ for queue in self.queues:
+ queue.destroy()
+
self.queues.clear()
# terminate_jobs()
@@ -261,8 +260,12 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- # Scheduler owner facing callback
- self._job_complete_callback(job, status)
+ self._state.remove_task(job.action_name, job.name)
+ if status == JobStatus.FAIL:
+ unique_id = None
+ if isinstance(job, ElementJob):
+ unique_id = job._element._unique_id
+ self._state.fail_task(job.action_name, job.name, unique_id)
# Now check for more jobs
self._sched()
@@ -321,8 +324,7 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- if self._job_start_callback:
- self._job_start_callback(job)
+ self._state.add_task(job.action_name, job.name, self.elapsed_time())
job.start()
# Callback for the cache size job