diff options
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r-- | src/buildstream/_scheduler/jobs/cachesizejob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/cleanupjob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/elementjob.py | 1 | ||||
-rw-r--r-- | src/buildstream/_scheduler/jobs/job.py | 7 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 11 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 36 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 30 |
7 files changed, 52 insertions, 35 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py index f36c30190..581101c07 100644 --- a/src/buildstream/_scheduler/jobs/cachesizejob.py +++ b/src/buildstream/_scheduler/jobs/cachesizejob.py @@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob class CacheSizeJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(self.action_name) self._complete_cb = complete_cb context = self._scheduler.context diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py index 85722c83f..3e9a8ff47 100644 --- a/src/buildstream/_scheduler/jobs/cleanupjob.py +++ b/src/buildstream/_scheduler/jobs/cleanupjob.py @@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob class CleanupJob(Job): def __init__(self, *args, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(self.action_name) self._complete_cb = complete_cb context = self._scheduler.context diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py index a535f55db..138448685 100644 --- a/src/buildstream/_scheduler/jobs/elementjob.py +++ b/src/buildstream/_scheduler/jobs/elementjob.py @@ -67,6 +67,7 @@ from .job import Job, ChildJob class ElementJob(Job): def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs): super().__init__(*args, **kwargs) + self.set_name(element._get_full_name()) self.queue = queue self._element = element self._action_cb = action_cb # The action callable function diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index 1c2726b5f..87f461939 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -129,6 +129,7 @@ class Job(): # # Public members # + self.name = None # The name of the job, set by the job's subclass self.action_name = action_name # The action name for the Queue self.child_data = None # Data to be sent to the main process @@ -150,6 +151,12 @@ class Job(): self._message_unique_id = None self._task_id = None + # set_name() + # + # Sets the name of this job + def set_name(self, name): + self.name = name + # start() # # Starts the job. diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index 1dd45607b..1be3f7cd0 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,7 @@ from datetime import timedelta from . import Queue, QueueStatus -from ..jobs import ElementJob, JobStatus +from ..jobs import JobStatus from ..resources import ResourceType from ..._message import MessageType @@ -55,14 +55,9 @@ class BuildQueue(Queue): detail=detail, action_name=self.action_name, elapsed=timedelta(seconds=0), logfile=logfile) - job = ElementJob(self._scheduler, self.action_name, - logfile, element=element, queue=self, - action_cb=self.get_process_func(), - complete_cb=self._job_done, - max_retries=self._max_retries) self._done_queue.append(element) - self.failed_elements.append(element) - self._scheduler._job_complete_callback(job, False) + element_name = element._get_full_name() + self._task_group.add_failed_task(element_name) return super().enqueue(to_queue) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index adc1cd467..538b2b9d1 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -63,13 +63,6 @@ class Queue(): def __init__(self, scheduler): # - # Public members - # - self.failed_elements = [] # List of failed elements, for the frontend - self.processed_elements_count = 0 # Number of processed elements, for the frontend - self.skipped_elements_count = 0 # Number of skipped elements, for the frontend - - # # Private members # self._scheduler = scheduler @@ -87,6 +80,17 @@ class Queue(): if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources: self._max_retries = scheduler.context.sched_network_retries + self._task_group = self._scheduler._state.add_task_group(self.action_name) + + # destroy() + # + # Explicitly performs all cleanup tasks for this queue + # + # Note: Doing this explicitly is preferred to a __del__ method because + # it is not at the mercy of the garbage collector + def destroy(self): + self._scheduler._state.remove_task_group(self.action_name) + ##################################################### # Abstract Methods for Queue implementations # ##################################################### @@ -236,6 +240,12 @@ class Queue(): def set_required_element_check(self): self._required_element_check = True + # any_failed_elements() + # + # Returns whether any elements in this queue have failed their jobs + def any_failed_elements(self): + return any(self._task_group.failed_tasks) + ##################################################### # Private Methods # ##################################################### @@ -298,7 +308,7 @@ class Queue(): # Report error and mark as failed # self._message(element, MessageType.ERROR, "Post processing error", detail=str(e)) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Treat this as a task error as it's related to a task # even though it did not occur in the task context @@ -314,18 +324,18 @@ class Queue(): self._message(element, MessageType.BUG, "Unhandled exception in post processing", detail=traceback.format_exc()) - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) else: # All elements get placed on the done queue for later processing. self._done_queue.append(element) # These lists are for bookkeeping purposes for the UI and logging. if status == JobStatus.SKIPPED or job.get_terminated(): - self.skipped_elements_count += 1 + self._task_group.add_skipped_task() elif status == JobStatus.OK: - self.processed_elements_count += 1 + self._task_group.add_processed_task() else: - self.failed_elements.append(element) + self._task_group.add_failed_task(element._get_full_name()) # Convenience wrapper for Queue implementations to send # a message for the element they are processing @@ -357,7 +367,7 @@ class Queue(): status = self.status(element) if status == QueueStatus.SKIP: # Place skipped elements into the done queue immediately - self.skipped_elements_count += 1 # Public record of skipped elements + self._task_group.add_skipped_task() self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 17d655ce2..d2ea2741e 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -28,7 +28,7 @@ from contextlib import contextmanager # Local imports from .resources import Resources, ResourceType -from .jobs import JobStatus, CacheSizeJob, CleanupJob +from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob from .._profile import Topics, PROFILER @@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size' # Args: # context: The Context in the parent scheduling process # start_time: The time at which the session started +# state: The state that can be made available to the frontend # interrupt_callback: A callback to handle ^C # ticker_callback: A callback call once per second -# job_start_callback: A callback call when each job starts -# job_complete_callback: A callback call when each job completes # class Scheduler(): def __init__(self, context, - start_time, + start_time, state, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -94,6 +91,7 @@ class Scheduler(): self._starttime = start_time # Initial application start time self._suspendtime = None # Session time compensation for suspended state self._queue_jobs = True # Whether we should continue to queue jobs + self._state = state # State of cache management related jobs self._cache_size_scheduled = False # Whether we have a cache size job scheduled @@ -104,8 +102,6 @@ class Scheduler(): # Callbacks to report back to the Scheduler owner self._interrupt_callback = interrupt_callback self._ticker_callback = ticker_callback - self._job_start_callback = job_start_callback - self._job_complete_callback = job_complete_callback # Whether our exclusive jobs, like 'cleanup' are currently already # waiting or active. @@ -163,7 +159,7 @@ class Scheduler(): # Stop handling unix signals self._disconnect_signals() - failed = any(any(queue.failed_elements) for queue in self.queues) + failed = any(queue.any_failed_elements() for queue in self.queues) self.loop = None if failed: @@ -184,6 +180,9 @@ class Scheduler(): # def clear_queues(self): if self.queues: + for queue in self.queues: + queue.destroy() + self.queues.clear() # terminate_jobs() @@ -261,8 +260,12 @@ class Scheduler(): # Remove from the active jobs list self._active_jobs.remove(job) - # Scheduler owner facing callback - self._job_complete_callback(job, status) + self._state.remove_task(job.action_name, job.name) + if status == JobStatus.FAIL: + unique_id = None + if isinstance(job, ElementJob): + unique_id = job._element._unique_id + self._state.fail_task(job.action_name, job.name, unique_id) # Now check for more jobs self._sched() @@ -321,8 +324,7 @@ class Scheduler(): # def _start_job(self, job): self._active_jobs.append(job) - if self._job_start_callback: - self._job_start_callback(job) + self._state.add_task(job.action_name, job.name, self.elapsed_time()) job.start() # Callback for the cache size job |