diff options
Diffstat (limited to 'src/buildstream/_scheduler/scheduler.py')
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 17d655ce2..d2ea2741e 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType -from .jobs import JobStatus, CacheSizeJob, CleanupJob +from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob from .._profile import Topics, PROFILER @@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size' # Args: # context: The Context in the parent scheduling process # start_time: The time at which the session started +# state: The state that can be made available to the frontend # interrupt_callback: A callback to handle ^C # ticker_callback: A callback call once per second -# job_start_callback: A callback call when each job starts -# job_complete_callback: A callback call when each job completes # class Scheduler(): def __init__(self, context, - start_time, + start_time, state, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -94,6 +91,7 @@ class Scheduler(): self._starttime = start_time # Initial application start time self._suspendtime = None # Session time compensation for suspended state self._queue_jobs = True # Whether we should continue to queue jobs + self._state = state # State of cache management related jobs self._cache_size_scheduled = False # Whether we have a cache size job scheduled @@ -104,8 +102,6 @@ class Scheduler(): # Callbacks to report back to the Scheduler owner self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback - self._job_start_callback = job_start_callback - self._job_complete_callback = job_complete_callback # Whether our exclusive jobs, like 'cleanup' are currently already # waiting or active. @@ -163,7 +159,7 @@ class Scheduler(): # Stop handling unix signals self._disconnect_signals() - failed = any(any(queue.failed_elements) for queue in self.queues) + failed = any(queue.any_failed_elements() for queue in self.queues) self.loop = None if failed: @@ -184,6 +180,9 @@ class Scheduler(): # def clear_queues(self): if self.queues: + for queue in self.queues: + queue.destroy() + self.queues.clear() # terminate_jobs() @@ -261,8 +260,12 @@ class Scheduler(): # Remove from the active jobs list self._active_jobs.remove(job) - # Scheduler owner facing callback - self._job_complete_callback(job, status) + self._state.remove_task(job.action_name, job.name) + if status == JobStatus.FAIL: + unique_id = None + if isinstance(job, ElementJob): + unique_id = job._element._unique_id + self._state.fail_task(job.action_name, job.name, unique_id) # Now check for more jobs self._sched() @@ -321,8 +324,7 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - if self._job_start_callback: - self._job_start_callback(job) + self._state.add_task(job.action_name, job.name, self.elapsed_time()) job.start() # Callback for the cache size job |