From 059035b90a8fe6d0fdb861f1f3987d9c2773a137 Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Sun, 6 Jan 2019 14:21:00 -0500 Subject: _scheduler/scheduler.py: Make _schedule_jobs() private This is not used anywhere outside of the Scheduler, currently only the Scheduler itself is allowed to queue a job at this level. If the highlevel business logic for automatic queueing of auxiliary jobs moves to another location, we can make this public again. --- buildstream/_scheduler/scheduler.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index b76c7308e..8facb085a 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -211,19 +211,6 @@ class Scheduler(): starttime = timenow return timenow - starttime - # schedule_jobs() - # - # Args: - # jobs ([Job]): A list of jobs to schedule - # - # Schedule 'Job's for the scheduler to run. Jobs scheduled will be - # run as soon any other queueing jobs finish, provided sufficient - # resources are available for them to run - # - def schedule_jobs(self, jobs): - for job in jobs: - self.waiting_jobs.append(job) - # job_completed(): # # Called when a Job completes @@ -257,7 +244,7 @@ class Scheduler(): resources=[ResourceType.CACHE, ResourceType.PROCESS], complete_cb=self._run_cleanup) - self.schedule_jobs([job]) + self._schedule_jobs([job]) ####################################################### # Local Private Methods # @@ -287,6 +274,21 @@ class Scheduler(): if not self.active_jobs and not self.waiting_jobs: self.loop.stop() + # _schedule_jobs() + # + # The main entry point for jobs to be scheduled. + # + # This is called either as a result of scanning the queues + # in _schedule_queue_jobs(), or directly by the Scheduler + # to insert special jobs like cleanups. + # + # Args: + # jobs ([Job]): A list of jobs to schedule + # + def _schedule_jobs(self, jobs): + for job in jobs: + self.waiting_jobs.append(job) + # _schedule_queue_jobs() # # Ask the queues what jobs they want to schedule and schedule @@ -331,7 +333,7 @@ class Scheduler(): # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - self.schedule_jobs(ready) + self._schedule_jobs(ready) self._sched() # _run_cleanup() @@ -357,7 +359,7 @@ class Scheduler(): resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE]) - self.schedule_jobs([job]) + self._schedule_jobs([job]) # _suspend_jobs() # -- cgit v1.2.1 From b83d1b1ffa4060c9cd147a79d776e276a283f4a2 Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Sun, 6 Jan 2019 14:15:46 -0500 Subject: _scheduler/scheduler.py: Only run one cache size job at a time When queuing the special cache management related cleanup and cache size jobs, we now treat these jobs as special and do the following: * Avoid queueing a cleanup/cache_size job if one is already queued We just drop redundantly queued jobs here. * Ensure that jobs of this type only run one at a time This could have been done with the Resources mechanics, however as these special jobs have the same properties and are basically owned by the Scheduler, it seemed more straight forward to handle the behaviors of these special jobs together. This fixes issue #753 --- buildstream/_scheduler/scheduler.py | 53 +++++++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index 8facb085a..ecbfef3ed 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -38,6 +38,16 @@ class SchedStatus(): TERMINATED = 1 +# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones +# which we launch dynamically, they have the property of being +# meaningless to queue if one is already queued, and it also +# doesnt make sense to run them in parallel +# +_ACTION_NAME_CLEANUP = 'cleanup' +_ACTION_NAME_CACHE_SIZE = 'cache_size' +_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE] + + # Scheduler() # # The scheduler operates on a list queues, each of which is meant to accomplish @@ -94,6 +104,15 @@ class Scheduler(): self._suspendtime = None self._queue_jobs = True # Whether we should continue to queue jobs + # Whether our exclusive jobs, like 'cleanup' are currently already + # waiting or active. + # + # This is just a bit quicker than scanning the wait queue and active + # queue and comparing job action names. + # + self._exclusive_waiting = set() + self._exclusive_active = set() + self._resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) @@ -223,6 +242,8 @@ class Scheduler(): def job_completed(self, job, success): self._resources.clear_job_resources(job) self.active_jobs.remove(job) + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + self._exclusive_active.remove(job.action_name) self._job_complete_callback(job, success) self._schedule_queue_jobs() self._sched() @@ -233,14 +254,9 @@ class Scheduler(): # size is calculated, a cleanup job will be run automatically # if needed. # - # FIXME: This should ensure that only one cache size job - # is ever pending at a given time. If a cache size - # job is already running, it is correct to queue - # a new one, it is incorrect to have more than one - # of these jobs pending at a given time, though. - # def check_cache_size(self): - job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size', + job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE, + 'cache_size/cache_size', resources=[ResourceType.CACHE, ResourceType.PROCESS], complete_cb=self._run_cleanup) @@ -263,10 +279,19 @@ class Scheduler(): if not self._resources.reserve_job_resources(job): continue + # Postpone these jobs if one is already running + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \ + job.action_name in self._exclusive_active: + continue + job.spawn() self.waiting_jobs.remove(job) self.active_jobs.append(job) + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + self._exclusive_waiting.remove(job.action_name) + self._exclusive_active.add(job.action_name) + if self._job_start_callback: self._job_start_callback(job) @@ -287,6 +312,18 @@ class Scheduler(): # def _schedule_jobs(self, jobs): for job in jobs: + + # Special treatment of our redundant exclusive jobs + # + if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: + + # Drop the job if one is already queued + if job.action_name in self._exclusive_waiting: + continue + + # Mark this action type as queued + self._exclusive_waiting.add(job.action_name) + self.waiting_jobs.append(job) # _schedule_queue_jobs() @@ -355,7 +392,7 @@ class Scheduler(): if not artifacts.has_quota_exceeded(): return - job = CleanupJob(self, 'cleanup', 'cleanup/cleanup', + job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup', resources=[ResourceType.CACHE, ResourceType.PROCESS], exclusive_resources=[ResourceType.CACHE]) -- cgit v1.2.1 From 16a8816f67cc2d78625c646275aaf337af82129d Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Mon, 7 Jan 2019 11:20:11 -0500 Subject: Scheduler: Introduced JobStatus instead of simple success boolean This changes the deepest callback from when a Job completes to propagate a JobStatus value instead of a simple boolean, and updates all of the effected code paths which used to receive a boolean to now handle the JobStatus values. This further improves the situation for issue #753, as now we avoid queueing cache size jobs for pull jobs which are skipped. --- buildstream/_frontend/app.py | 6 ++--- buildstream/_scheduler/__init__.py | 2 +- buildstream/_scheduler/jobs/__init__.py | 1 + buildstream/_scheduler/jobs/cachesizejob.py | 6 ++--- buildstream/_scheduler/jobs/cleanupjob.py | 6 ++--- buildstream/_scheduler/jobs/elementjob.py | 6 ++--- buildstream/_scheduler/jobs/job.py | 36 +++++++++++++++++++++++++---- buildstream/_scheduler/queues/buildqueue.py | 6 ++--- buildstream/_scheduler/queues/fetchqueue.py | 5 ++-- buildstream/_scheduler/queues/pullqueue.py | 8 ++++--- buildstream/_scheduler/queues/queue.py | 13 +++++------ buildstream/_scheduler/queues/trackqueue.py | 5 ++-- buildstream/_scheduler/scheduler.py | 6 ++--- 13 files changed, 68 insertions(+), 38 deletions(-) diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py index 063ca1ec2..af38ae901 100644 --- a/buildstream/_frontend/app.py +++ b/buildstream/_frontend/app.py @@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages from .._stream import Stream from .._versions import BST_FORMAT_VERSION from .. import _yaml -from .._scheduler import ElementJob +from .._scheduler import ElementJob, JobStatus # Import frontend assets from . import Profile, LogLine, Status @@ -515,13 +515,13 @@ class App(): self._status.add_job(job) self._maybe_render_status() - def _job_completed(self, job, success): + def _job_completed(self, job, status): self._status.remove_job(job) self._maybe_render_status() # Dont attempt to handle a failure if the user has already opted to # terminate - if not success and not self.stream.terminated: + if status == JobStatus.FAIL and not self.stream.terminated: if isinstance(job, ElementJob): element = job.element diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py index b6e3eeb94..470859864 100644 --- a/buildstream/_scheduler/__init__.py +++ b/buildstream/_scheduler/__init__.py @@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue from .queues.pullqueue import PullQueue from .scheduler import Scheduler, SchedStatus -from .jobs import ElementJob +from .jobs import ElementJob, JobStatus diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py index 4b0b11dac..3e213171a 100644 --- a/buildstream/_scheduler/jobs/__init__.py +++ b/buildstream/_scheduler/jobs/__init__.py @@ -20,3 +20,4 @@ from .elementjob import ElementJob from .cachesizejob import CacheSizeJob from .cleanupjob import CleanupJob +from .job import JobStatus diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py index d46fd4c16..6e4698af9 100644 --- a/buildstream/_scheduler/jobs/cachesizejob.py +++ b/buildstream/_scheduler/jobs/cachesizejob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat # -from .job import Job +from .job import Job, JobStatus class CacheSizeJob(Job): @@ -30,8 +30,8 @@ class CacheSizeJob(Job): def child_process(self): return self._artifacts.compute_cache_size() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) if self._complete_cb: diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py index 8bdbba0ed..e579e9718 100644 --- a/buildstream/_scheduler/jobs/cleanupjob.py +++ b/buildstream/_scheduler/jobs/cleanupjob.py @@ -16,7 +16,7 @@ # Author: # Tristan Daniël Maat # -from .job import Job +from .job import Job, JobStatus class CleanupJob(Job): @@ -29,6 +29,6 @@ class CleanupJob(Job): def child_process(self): return self._artifacts.clean() - def parent_complete(self, success, result): - if success: + def parent_complete(self, status, result): + if status == JobStatus.OK: self._artifacts.set_cache_size(result) diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py index 8ce5c062f..fa0d34fb3 100644 --- a/buildstream/_scheduler/jobs/elementjob.py +++ b/buildstream/_scheduler/jobs/elementjob.py @@ -60,7 +60,7 @@ from .job import Job # Args: # job (Job): The job object which completed # element (Element): The element passed to the Job() constructor -# success (bool): True if the action_cb did not raise an exception +# status (JobStatus): The status of whether the workload raised an exception # result (object): The deserialized object returned by the `action_cb`, or None # if `success` is False # @@ -93,8 +93,8 @@ class ElementJob(Job): # Run the action return self._action_cb(self._element) - def parent_complete(self, success, result): - self._complete_cb(self, self._element, success, self._result) + def parent_complete(self, status, result): + self._complete_cb(self, self._element, status, self._result) def message(self, message_type, message, **kwargs): args = dict(kwargs) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index a36483b0c..d966a6985 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -41,6 +41,22 @@ RC_PERM_FAIL = 2 RC_SKIPPED = 3 +# JobStatus: +# +# The job completion status, passed back through the +# complete callbacks. +# +class JobStatus(): + # Job succeeded + OK = 0 + + # A temporary BstError was raised + FAIL = 1 + + # A SkipJob was raised + SKIPPED = 3 + + # Used to distinguish between status messages and return values class Envelope(): def __init__(self, message_type, message): @@ -295,10 +311,10 @@ class Job(): # pass the result to the main thread. # # Args: - # success (bool): Whether the job was successful. + # status (JobStatus): The job exit status # result (any): The result returned by child_process(). # - def parent_complete(self, success, result): + def parent_complete(self, status, result): raise ImplError("Job '{kind}' does not implement parent_complete()" .format(kind=type(self).__name__)) @@ -569,9 +585,19 @@ class Job(): self.spawn() return - success = returncode in (RC_OK, RC_SKIPPED) - self.parent_complete(success, self._result) - self._scheduler.job_completed(self, success) + # Resolve the outward facing overall job completion status + # + if returncode == RC_OK: + status = JobStatus.OK + elif returncode == RC_SKIPPED: + status = JobStatus.SKIPPED + elif returncode in (RC_FAIL, RC_PERM_FAIL): + status = JobStatus.FAIL + else: + status = JobStatus.FAIL + + self.parent_complete(status, self._result) + self._scheduler.job_completed(self, status) # Force the deletion of the queue and process objects to try and clean up FDs self._queue = self._process = None diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index d05327557..9d8e7182b 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -21,7 +21,7 @@ from datetime import timedelta from . import Queue, QueueStatus -from ..jobs import ElementJob +from ..jobs import ElementJob, JobStatus from ..resources import ResourceType from ..._message import MessageType @@ -104,7 +104,7 @@ class BuildQueue(Queue): if artifacts.has_quota_exceeded(): self._scheduler.check_cache_size() - def done(self, job, element, result, success): + def done(self, job, element, result, status): # Inform element in main process that assembly is done element._assemble_done() @@ -117,5 +117,5 @@ class BuildQueue(Queue): # artifact cache size for a successful build even though we know a # failed build also grows the artifact cache size. # - if success: + if status == JobStatus.OK: self._check_cache_size(job, element, result) diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index c58bfdb57..fc11fd1d1 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -24,6 +24,7 @@ from ... import Consistency # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which fetches element sources @@ -66,9 +67,9 @@ class FetchQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return element._update_state() diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index b861373a9..dbeb806e5 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus from ..._exceptions import SkipJob @@ -54,9 +55,9 @@ class PullQueue(Queue): else: return QueueStatus.SKIP - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return element._pull_done() @@ -64,4 +65,5 @@ class PullQueue(Queue): # Build jobs will check the "approximate" size first. Since we # do not get an artifact size from pull jobs, we have to # actually check the cache size. - self._scheduler.check_cache_size() + if status == JobStatus.OK: + self._scheduler.check_cache_size() diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 055e2f84b..707fcf511 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -25,7 +25,7 @@ from enum import Enum import traceback # Local imports -from ..jobs import ElementJob +from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports @@ -133,10 +133,9 @@ class Queue(): # job (Job): The job which completed processing # element (Element): The element which completed processing # result (any): The return value of the process() implementation - # success (bool): True if the process() implementation did not - # raise any exception + # status (JobStatus): The return status of the Job # - def done(self, job, element, result, success): + def done(self, job, element, result, status): pass ##################################################### @@ -291,7 +290,7 @@ class Queue(): # # See the Job object for an explanation of the call signature # - def _job_done(self, job, element, success, result): + def _job_done(self, job, element, status, result): # Update values that need to be synchronized in the main task # before calling any queue implementation @@ -301,7 +300,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - self.done(job, element, result, success) + self.done(job, element, result, status) except BstError as e: # Report error and mark as failed @@ -337,7 +336,7 @@ class Queue(): # if they are not skipped. if job.skipped: self.skipped_elements.append(element) - elif success: + elif status == JobStatus.OK: self.processed_elements.append(element) else: self.failed_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index 514418086..245b528e6 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..jobs import JobStatus # A queue which tracks sources @@ -47,9 +48,9 @@ class TrackQueue(Queue): return QueueStatus.READY - def done(self, _, element, result, success): + def done(self, _, element, result, status): - if not success: + if status == JobStatus.FAIL: return # Set the new refs in the main process one by one as they complete diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index ecbfef3ed..97335de34 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -237,14 +237,14 @@ class Scheduler(): # Args: # queue (Queue): The Queue holding a complete job # job (Job): The completed Job - # success (bool): Whether the Job completed with a success status + # status (JobStatus): The status of the completed job # - def job_completed(self, job, success): + def job_completed(self, job, status): self._resources.clear_job_resources(job) self.active_jobs.remove(job) if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS: self._exclusive_active.remove(job.action_name) - self._job_complete_callback(job, success) + self._job_complete_callback(job, status) self._schedule_queue_jobs() self._sched() -- cgit v1.2.1 From c2fc2a5ea468c52e02886bac4ba890b4bce8f4b5 Mon Sep 17 00:00:00 2001 From: Tristan Van Berkom Date: Mon, 7 Jan 2019 12:55:23 -0500 Subject: _scheduler/jobs/job.py: Removed 'skipped' property This is redundant now that we report it through the JobStatus. --- buildstream/_scheduler/jobs/job.py | 16 ---------------- buildstream/_scheduler/queues/queue.py | 6 ++---- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index d966a6985..837469a39 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -132,7 +132,6 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - self._skipped_flag = False # Indicate whether the job was skipped. self._terminated = False # Whether this job has been explicitly terminated # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. @@ -289,18 +288,6 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id - # skipped - # - # This will evaluate to True if the job was skipped - # during processing, or if it was forcefully terminated. - # - # Returns: - # (bool): Whether the job should appear as skipped - # - @property - def skipped(self): - return self._skipped_flag or self._terminated - ####################################################### # Abstract Methods # ####################################################### @@ -578,9 +565,6 @@ class Job(): # self._retry_flag = returncode == RC_FAIL - # Set the flag to alert Queue that this job skipped. - self._skipped_flag = returncode == RC_SKIPPED - if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 707fcf511..81760ace4 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -331,10 +331,8 @@ class Queue(): # All jobs get placed on the done queue for later processing. self._done_queue.append(job) - # A Job can be skipped whether or not it has failed, - # we want to only bookkeep them as processed or failed - # if they are not skipped. - if job.skipped: + # These lists are for bookkeeping purposes for the UI and logging. + if status == JobStatus.SKIPPED: self.skipped_elements.append(element) elif status == JobStatus.OK: self.processed_elements.append(element) -- cgit v1.2.1