diff options
author | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2019-01-07 18:50:23 +0000 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2019-01-07 18:50:23 +0000 |
commit | 3e3984adf2bee0d5804a1762895002ca3e29cb33 (patch) | |
tree | 28aa95e4046b0f5d6d61e8a52795c40c6c0dee94 | |
parent | 5de42d43061160ddcd62e8bb6ea3dc3ea151ab07 (diff) | |
parent | c2fc2a5ea468c52e02886bac4ba890b4bce8f4b5 (diff) | |
download | buildstream-3e3984adf2bee0d5804a1762895002ca3e29cb33.tar.gz |
Merge branch 'tristan/one-cache-size-job' into 'master'
Only queue one cache size job
Closes #753
See merge request BuildStream/buildstream!1040
-rw-r--r-- | buildstream/_frontend/app.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/__init__.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/__init__.py | 1 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cachesizejob.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/cleanupjob.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/elementjob.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 52 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 6 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 19 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 5 | ||||
-rw-r--r-- | buildstream/_scheduler/scheduler.py | 93 |
13 files changed, 133 insertions, 82 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 063ca1ec2..af38ae901 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml -from .._scheduler import ElementJob +from .._scheduler import ElementJob, JobStatus # Import frontend assets from . import Profile, LogLine, Status @@ -515,13 +515,13 @@ class App(): self._status.add_job(job) self._maybe_render_status() - def _job_completed(self, job, success): + def _job_completed(self, job, status): self._status.remove_job(job) self._maybe_render_status() # Dont attempt to handle a failure if the user has already opted to # terminate - if not success and not self.stream.terminated: + if status == JobStatus.FAIL and not self.stream.terminated: if isinstance(job, ElementJob): element = job.element diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index b6e3eeb94..470859864 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus -from .jobs import ElementJob +from .jobs import ElementJob, JobStatus diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 4b0b11dac..3e213171a 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -20,3 +20,4 @@ from .elementjob import ElementJob from .cachesizejob import CacheSizeJob from .cleanupjob import CleanupJob +from .job import JobStatus diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index d46fd4c16..6e4698af9 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job +from .job import Job, JobStatus class CacheSizeJob(Job): @@ -30,8 +30,8 @@ class CacheSizeJob(Job): def child_process(self): return self._artifacts.compute_cache_size() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) if self._complete_cb: diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 8bdbba0ed..e579e9718 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat <tristan.maat@codethink.co.uk> # -from .job import Job +from .job import Job, JobStatus class CleanupJob(Job): @@ -29,6 +29,6 @@ class CleanupJob(Job): def child_process(self): return self._artifacts.clean() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 8ce5c062f..fa0d34fb3 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -60,7 +60,7 @@ from .job import Job # Args: # job (Job): The job object which completed # element (Element): The element passed to the Job() constructor -# success (bool): True if the action_cb did not raise an exception +# status (JobStatus): The status of whether the workload raised an exception # result (object): The deserialized object returned by the `action_cb`, or None # if `success` is False # @@ -93,8 +93,8 @@ class ElementJob(Job): # Run the action return self._action_cb(self._element) - def parent_complete(self, success, result): - self._complete_cb(self, self._element, success, self._result) + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) def message(self, message_type, message, **kwargs): args = dict(kwargs) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index a36483b0c..837469a39 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -41,6 +41,22 @@ RC_PERM_FAIL = 2 RC_SKIPPED = 3 +# JobStatus: +# +# The job completion status, passed back through the +# complete callbacks. +# +class JobStatus(): + # Job succeeded + OK = 0 + + # A temporary BstError was raised + FAIL = 1 + + # A SkipJob was raised + SKIPPED = 3 + + # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): @@ -116,7 +132,6 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - self._skipped_flag = False # Indicate whether the job was skipped. self._terminated = False # Whether this job has been explicitly terminated # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. @@ -273,18 +288,6 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id - # skipped - # - # This will evaluate to True if the job was skipped - # during processing, or if it was forcefully terminated. - # - # Returns: - # (bool): Whether the job should appear as skipped - # - @property - def skipped(self): - return self._skipped_flag or self._terminated - ####################################################### # Abstract Methods # ####################################################### @@ -295,10 +298,10 @@ class Job(): # pass the result to the main thread. # # Args: - # success (bool): Whether the job was successful. + # status (JobStatus): The job exit status # result (any): The result returned by child_process(). # - def parent_complete(self, success, result): + def parent_complete(self, status, result): raise ImplError("Job '{kind}' does not implement parent_complete()" .format(kind=type(self).__name__)) @@ -562,16 +565,23 @@ class Job(): # self._retry_flag = returncode == RC_FAIL - # Set the flag to alert Queue that this job skipped. - self._skipped_flag = returncode == RC_SKIPPED - if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - success = returncode in (RC_OK, RC_SKIPPED) - self.parent_complete(success, self._result) - self._scheduler.job_completed(self, success) + # Resolve the outward facing overall job completion status + # + if returncode == RC_OK: + status = JobStatus.OK + elif returncode == RC_SKIPPED: + status = JobStatus.SKIPPED + elif returncode in (RC_FAIL, RC_PERM_FAIL): + status = JobStatus.FAIL + else: + status = JobStatus.FAIL + + self.parent_complete(status, self._result) + self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs self._queue = self._process = None diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index d05327557..9d8e7182b 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,7 @@ from datetime import timedelta from . import Queue, QueueStatus -from ..jobs import ElementJob +from ..jobs import ElementJob, JobStatus from ..resources import ResourceType from ..._message import MessageType @@ -104,7 +104,7 @@ class BuildQueue(Queue): if artifacts.has_quota_exceeded(): self._scheduler.check_cache_size() - def done(self, job, element, result, success): + def done(self, job, element, result, status): # Inform element in main process that assembly is done element._assemble_done() @@ -117,5 +117,5 @@ class BuildQueue(Queue): # artifact cache size for a successful build even though we know a # failed build also grows the artifact cache size. # - if success: + if status == JobStatus.OK: self._check_cache_size(job, element, result) diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index c58bfdb57..fc11fd1d1 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -24,6 +24,7 @@ from ... import Consistency # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which fetches element sources @@ -66,9 +67,9 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return element._update_state() diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index b861373a9..dbeb806e5 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus from ..._exceptions import SkipJob @@ -54,9 +55,9 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return element._pull_done() @@ -64,4 +65,5 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - self._scheduler.check_cache_size() + if status == JobStatus.OK: + self._scheduler.check_cache_size() diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 055e2f84b..81760ace4 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -25,7 +25,7 @@ from enum import Enum import traceback # Local imports -from ..jobs import ElementJob +from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports @@ -133,10 +133,9 @@ class Queue(): # job (Job): The job which completed processing # element (Element): The element which completed processing # result (any): The return value of the process() implementation - # success (bool): True if the process() implementation did not - # raise any exception + # status (JobStatus): The return status of the Job # - def done(self, job, element, result, success): + def done(self, job, element, result, status): pass ##################################################### @@ -291,7 +290,7 @@ class Queue(): # # See the Job object for an explanation of the call signature # - def _job_done(self, job, element, success, result): + def _job_done(self, job, element, status, result): # Update values that need to be synchronized in the main task # before calling any queue implementation @@ -301,7 +300,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - self.done(job, element, result, success) + self.done(job, element, result, status) except BstError as e: # Report error and mark as failed @@ -332,12 +331,10 @@ class Queue(): # All jobs get placed on the done queue for later processing. self._done_queue.append(job) - # A Job can be skipped whether or not it has failed, - # we want to only bookkeep them as processed or failed - # if they are not skipped. - if job.skipped: + # These lists are for bookkeeping purposes for the UI and logging. + if status == JobStatus.SKIPPED: self.skipped_elements.append(element) - elif success: + elif status == JobStatus.OK: self.processed_elements.append(element) else: self.failed_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index 514418086..245b528e6 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which tracks sources @@ -47,9 +48,9 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return # Set the new refs in the main process one by one as they complete diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index b76c7308e..97335de34 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -38,6 +38,16 @@ class SchedStatus(): TERMINATED = 1 +# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones +# which we launch dynamically, they have the property of being +# meaningless to queue if one is already queued, and it also +# doesnt make sense to run them in parallel +# +_ACTION_NAME_CLEANUP = 'cleanup' +_ACTION_NAME_CACHE_SIZE = 'cache_size' +_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE] + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -94,6 +104,15 @@ class Scheduler(): self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() + self._resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) @@ -211,19 +230,6 @@ class Scheduler(): starttime = timenow return timenow - starttime - # schedule_jobs() - # - # Args: - # jobs ([Job]): A list of jobs to schedule - # - # Schedule 'Job's for the scheduler to run. Jobs scheduled will be - # run as soon any other queueing jobs finish, provided sufficient - # resources are available for them to run - # - def schedule_jobs(self, jobs): - for job in jobs: - self.waiting_jobs.append(job) - # job_completed(): # # Called when a Job completes @@ -231,12 +237,14 @@ class Scheduler(): # Args: # queue (Queue): The Queue holding a complete job # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status + # status (JobStatus): The status of the completed job # - def job_completed(self, job, success): + def job_completed(self, job, status): self._resources.clear_job_resources(job) self.active_jobs.remove(job) - self._job_complete_callback(job, success) + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + self._exclusive_active.remove(job.action_name) + self._job_complete_callback(job, status) self._schedule_queue_jobs() self._sched() @@ -246,18 +254,13 @@ class Scheduler(): # size is calculated, a cleanup job will be run automatically # if needed. # - # FIXME: This should ensure that only one cache size job - # is ever pending at a given time. If a cache size - # job is already running, it is correct to queue - # a new one, it is incorrect to have more than one - # of these jobs pending at a given time, though. - # def check_cache_size(self): - job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', + job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, + 'cache_size/cache_size', resources=[ResourceType.CACHE, ResourceType.PROCESS], complete_cb=self._run_cleanup) - self.schedule_jobs([job]) + self._schedule_jobs([job]) ####################################################### # Local Private Methods # @@ -276,10 +279,19 @@ class Scheduler(): if not self._resources.reserve_job_resources(job): continue + # Postpone these jobs if one is already running + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \ + job.action_name in self._exclusive_active: + continue + job.spawn() self.waiting_jobs.remove(job) self.active_jobs.append(job) + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + self._exclusive_waiting.remove(job.action_name) + self._exclusive_active.add(job.action_name) + if self._job_start_callback: self._job_start_callback(job) @@ -287,6 +299,33 @@ class Scheduler(): if not self.active_jobs and not self.waiting_jobs: self.loop.stop() + # _schedule_jobs() + # + # The main entry point for jobs to be scheduled. + # + # This is called either as a result of scanning the queues + # in _schedule_queue_jobs(), or directly by the Scheduler + # to insert special jobs like cleanups. + # + # Args: + # jobs ([Job]): A list of jobs to schedule + # + def _schedule_jobs(self, jobs): + for job in jobs: + + # Special treatment of our redundant exclusive jobs + # + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + + # Drop the job if one is already queued + if job.action_name in self._exclusive_waiting: + continue + + # Mark this action type as queued + self._exclusive_waiting.add(job.action_name) + + self.waiting_jobs.append(job) + # _schedule_queue_jobs() # # Ask the queues what jobs they want to schedule and schedule @@ -331,7 +370,7 @@ class Scheduler(): # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - self.schedule_jobs(ready) + self._schedule_jobs(ready) self._sched() # _run_cleanup() @@ -353,11 +392,11 @@ class Scheduler(): if not artifacts.has_quota_exceeded(): return - job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', + job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup', resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE]) - self.schedule_jobs([job]) + self._schedule_jobs([job]) # _suspend_jobs() # |