summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler')
-rw-r--r--src/buildstream/_scheduler/jobs/cachesizejob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/cleanupjob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/elementjob.py1
-rw-r--r--src/buildstream/_scheduler/jobs/job.py7
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py11
-rw-r--r--src/buildstream/_scheduler/queues/queue.py36
-rw-r--r--src/buildstream/_scheduler/scheduler.py30
7 files changed, 52 insertions, 35 deletions
diff --git a/src/buildstream/_scheduler/jobs/cachesizejob.py b/src/buildstream/_scheduler/jobs/cachesizejob.py
index f36c30190..581101c07 100644
--- a/src/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/src/buildstream/_scheduler/jobs/cachesizejob.py
@@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob
class CacheSizeJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(self.action_name)
self._complete_cb = complete_cb
context = self._scheduler.context
diff --git a/src/buildstream/_scheduler/jobs/cleanupjob.py b/src/buildstream/_scheduler/jobs/cleanupjob.py
index 85722c83f..3e9a8ff47 100644
--- a/src/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/src/buildstream/_scheduler/jobs/cleanupjob.py
@@ -22,6 +22,7 @@ from .job import Job, JobStatus, ChildJob
class CleanupJob(Job):
def __init__(self, *args, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(self.action_name)
self._complete_cb = complete_cb
context = self._scheduler.context
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index a535f55db..138448685 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -67,6 +67,7 @@ from .job import Job, ChildJob
class ElementJob(Job):
def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
super().__init__(*args, **kwargs)
+ self.set_name(element._get_full_name())
self.queue = queue
self._element = element
self._action_cb = action_cb # The action callable function
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 1c2726b5f..87f461939 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -129,6 +129,7 @@ class Job():
#
# Public members
#
+ self.name = None # The name of the job, set by the job's subclass
self.action_name = action_name # The action name for the Queue
self.child_data = None # Data to be sent to the main process
@@ -150,6 +151,12 @@ class Job():
self._message_unique_id = None
self._task_id = None
+ # set_name()
+ #
+ # Sets the name of this job
+ def set_name(self, name):
+ self.name = name
+
# start()
#
# Starts the job.
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 1dd45607b..1be3f7cd0 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,7 @@
from datetime import timedelta
from . import Queue, QueueStatus
-from ..jobs import ElementJob, JobStatus
+from ..jobs import JobStatus
from ..resources import ResourceType
from ..._message import MessageType
@@ -55,14 +55,9 @@ class BuildQueue(Queue):
detail=detail, action_name=self.action_name,
elapsed=timedelta(seconds=0),
logfile=logfile)
- job = ElementJob(self._scheduler, self.action_name,
- logfile, element=element, queue=self,
- action_cb=self.get_process_func(),
- complete_cb=self._job_done,
- max_retries=self._max_retries)
self._done_queue.append(element)
- self.failed_elements.append(element)
- self._scheduler._job_complete_callback(job, False)
+ element_name = element._get_full_name()
+ self._task_group.add_failed_task(element_name)
return super().enqueue(to_queue)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index adc1cd467..538b2b9d1 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -63,13 +63,6 @@ class Queue():
def __init__(self, scheduler):
#
- # Public members
- #
- self.failed_elements = [] # List of failed elements, for the frontend
- self.processed_elements_count = 0 # Number of processed elements, for the frontend
- self.skipped_elements_count = 0 # Number of skipped elements, for the frontend
-
- #
# Private members
#
self._scheduler = scheduler
@@ -87,6 +80,17 @@ class Queue():
if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
self._max_retries = scheduler.context.sched_network_retries
+ self._task_group = self._scheduler._state.add_task_group(self.action_name)
+
+ # destroy()
+ #
+ # Explicitly performs all cleanup tasks for this queue
+ #
+ # Note: Doing this explicitly is preferred to a __del__ method because
+ # it is not at the mercy of the garbage collector
+ def destroy(self):
+ self._scheduler._state.remove_task_group(self.action_name)
+
#####################################################
# Abstract Methods for Queue implementations #
#####################################################
@@ -236,6 +240,12 @@ class Queue():
def set_required_element_check(self):
self._required_element_check = True
+ # any_failed_elements()
+ #
+ # Returns whether any elements in this queue have failed their jobs
+ def any_failed_elements(self):
+ return any(self._task_group.failed_tasks)
+
#####################################################
# Private Methods #
#####################################################
@@ -298,7 +308,7 @@ class Queue():
# Report error and mark as failed
#
self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Treat this as a task error as it's related to a task
# even though it did not occur in the task context
@@ -314,18 +324,18 @@ class Queue():
self._message(element, MessageType.BUG,
"Unhandled exception in post processing",
detail=traceback.format_exc())
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
else:
# All elements get placed on the done queue for later processing.
self._done_queue.append(element)
# These lists are for bookkeeping purposes for the UI and logging.
if status == JobStatus.SKIPPED or job.get_terminated():
- self.skipped_elements_count += 1
+ self._task_group.add_skipped_task()
elif status == JobStatus.OK:
- self.processed_elements_count += 1
+ self._task_group.add_processed_task()
else:
- self.failed_elements.append(element)
+ self._task_group.add_failed_task(element._get_full_name())
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
@@ -357,7 +367,7 @@ class Queue():
status = self.status(element)
if status == QueueStatus.SKIP:
# Place skipped elements into the done queue immediately
- self.skipped_elements_count += 1 # Public record of skipped elements
+ self._task_group.add_skipped_task()
self._done_queue.append(element) # Elements to proceed to the next queue
elif status == QueueStatus.READY:
# Push elements which are ready to be processed immediately into the queue
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 17d655ce2..d2ea2741e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
# Local imports
from .resources import Resources, ResourceType
-from .jobs import JobStatus, CacheSizeJob, CleanupJob
+from .jobs import JobStatus, CacheSizeJob, CleanupJob, ElementJob
from .._profile import Topics, PROFILER
@@ -61,19 +61,16 @@ _ACTION_NAME_CACHE_SIZE = 'size'
# Args:
# context: The Context in the parent scheduling process
# start_time: The time at which the session started
+# state: The state that can be made available to the frontend
# interrupt_callback: A callback to handle ^C
# ticker_callback: A callback call once per second
-# job_start_callback: A callback call when each job starts
-# job_complete_callback: A callback call when each job completes
#
class Scheduler():
def __init__(self, context,
- start_time,
+ start_time, state,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -94,6 +91,7 @@ class Scheduler():
self._starttime = start_time # Initial application start time
self._suspendtime = None # Session time compensation for suspended state
self._queue_jobs = True # Whether we should continue to queue jobs
+ self._state = state
# State of cache management related jobs
self._cache_size_scheduled = False # Whether we have a cache size job scheduled
@@ -104,8 +102,6 @@ class Scheduler():
# Callbacks to report back to the Scheduler owner
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
- self._job_start_callback = job_start_callback
- self._job_complete_callback = job_complete_callback
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -163,7 +159,7 @@ class Scheduler():
# Stop handling unix signals
self._disconnect_signals()
- failed = any(any(queue.failed_elements) for queue in self.queues)
+ failed = any(queue.any_failed_elements() for queue in self.queues)
self.loop = None
if failed:
@@ -184,6 +180,9 @@ class Scheduler():
#
def clear_queues(self):
if self.queues:
+ for queue in self.queues:
+ queue.destroy()
+
self.queues.clear()
# terminate_jobs()
@@ -261,8 +260,12 @@ class Scheduler():
# Remove from the active jobs list
self._active_jobs.remove(job)
- # Scheduler owner facing callback
- self._job_complete_callback(job, status)
+ self._state.remove_task(job.action_name, job.name)
+ if status == JobStatus.FAIL:
+ unique_id = None
+ if isinstance(job, ElementJob):
+ unique_id = job._element._unique_id
+ self._state.fail_task(job.action_name, job.name, unique_id)
# Now check for more jobs
self._sched()
@@ -321,8 +324,7 @@ class Scheduler():
#
def _start_job(self, job):
self._active_jobs.append(job)
- if self._job_start_callback:
- self._job_start_callback(job)
+ self._state.add_task(job.action_name, job.name, self.elapsed_time())
job.start()
# Callback for the cache size job