summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-07 18:50:23 +0000
committerTristan Van Berkom <tristan.van.berkom@gmail.com>2019-01-07 18:50:23 +0000
commit3e3984adf2bee0d5804a1762895002ca3e29cb33 (patch)
tree28aa95e4046b0f5d6d61e8a52795c40c6c0dee94
parent5de42d43061160ddcd62e8bb6ea3dc3ea151ab07 (diff)
parentc2fc2a5ea468c52e02886bac4ba890b4bce8f4b5 (diff)
downloadbuildstream-3e3984adf2bee0d5804a1762895002ca3e29cb33.tar.gz
Merge branch 'tristan/one-cache-size-job' into 'master'
Only queue one cache size job Closes #753 See merge request BuildStream/buildstream!1040
-rw-r--r--buildstream/_frontend/app.py6
-rw-r--r--buildstream/_scheduler/__init__.py2
-rw-r--r--buildstream/_scheduler/jobs/__init__.py1
-rw-r--r--buildstream/_scheduler/jobs/cachesizejob.py6
-rw-r--r--buildstream/_scheduler/jobs/cleanupjob.py6
-rw-r--r--buildstream/_scheduler/jobs/elementjob.py6
-rw-r--r--buildstream/_scheduler/jobs/job.py52
-rw-r--r--buildstream/_scheduler/queues/buildqueue.py6
-rw-r--r--buildstream/_scheduler/queues/fetchqueue.py5
-rw-r--r--buildstream/_scheduler/queues/pullqueue.py8
-rw-r--r--buildstream/_scheduler/queues/queue.py19
-rw-r--r--buildstream/_scheduler/queues/trackqueue.py5
-rw-r--r--buildstream/_scheduler/scheduler.py93
13 files changed, 133 insertions, 82 deletions
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 063ca1ec2..af38ae901 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -38,7 +38,7 @@ from .._message import Message, MessageType, unconditional_messages
from .._stream import Stream
from .._versions import BST_FORMAT_VERSION
from .. import _yaml
-from .._scheduler import ElementJob
+from .._scheduler import ElementJob, JobStatus
# Import frontend assets
from . import Profile, LogLine, Status
@@ -515,13 +515,13 @@ class App():
self._status.add_job(job)
self._maybe_render_status()
- def _job_completed(self, job, success):
+ def _job_completed(self, job, status):
self._status.remove_job(job)
self._maybe_render_status()
# Dont attempt to handle a failure if the user has already opted to
# terminate
- if not success and not self.stream.terminated:
+ if status == JobStatus.FAIL and not self.stream.terminated:
if isinstance(job, ElementJob):
element = job.element
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index b6e3eeb94..470859864 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -26,4 +26,4 @@ from .queues.pushqueue import PushQueue
from .queues.pullqueue import PullQueue
from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob
+from .jobs import ElementJob, JobStatus
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 4b0b11dac..3e213171a 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -20,3 +20,4 @@
from .elementjob import ElementJob
from .cachesizejob import CacheSizeJob
from .cleanupjob import CleanupJob
+from .job import JobStatus
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
index d46fd4c16..6e4698af9 100644
--- a/buildstream/_scheduler/jobs/cachesizejob.py
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job
+from .job import Job, JobStatus
class CacheSizeJob(Job):
@@ -30,8 +30,8 @@ class CacheSizeJob(Job):
def child_process(self):
return self._artifacts.compute_cache_size()
- def parent_complete(self, success, result):
- if success:
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
if self._complete_cb:
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
index 8bdbba0ed..e579e9718 100644
--- a/buildstream/_scheduler/jobs/cleanupjob.py
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -16,7 +16,7 @@
# Author:
# Tristan Daniël Maat <tristan.maat@codethink.co.uk>
#
-from .job import Job
+from .job import Job, JobStatus
class CleanupJob(Job):
@@ -29,6 +29,6 @@ class CleanupJob(Job):
def child_process(self):
return self._artifacts.clean()
- def parent_complete(self, success, result):
- if success:
+ def parent_complete(self, status, result):
+ if status == JobStatus.OK:
self._artifacts.set_cache_size(result)
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c062f..fa0d34fb3 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -60,7 +60,7 @@ from .job import Job
# Args:
# job (Job): The job object which completed
# element (Element): The element passed to the Job() constructor
-# success (bool): True if the action_cb did not raise an exception
+# status (JobStatus): The status of whether the workload raised an exception
# result (object): The deserialized object returned by the `action_cb`, or None
# if `success` is False
#
@@ -93,8 +93,8 @@ class ElementJob(Job):
# Run the action
return self._action_cb(self._element)
- def parent_complete(self, success, result):
- self._complete_cb(self, self._element, success, self._result)
+ def parent_complete(self, status, result):
+ self._complete_cb(self, self._element, status, self._result)
def message(self, message_type, message, **kwargs):
args = dict(kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index a36483b0c..837469a39 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -41,6 +41,22 @@ RC_PERM_FAIL = 2
RC_SKIPPED = 3
+# JobStatus:
+#
+# The job completion status, passed back through the
+# complete callbacks.
+#
+class JobStatus():
+ # Job succeeded
+ OK = 0
+
+ # A temporary BstError was raised
+ FAIL = 1
+
+ # A SkipJob was raised
+ SKIPPED = 3
+
+
# Used to distinguish between status messages and return values
class Envelope():
def __init__(self, message_type, message):
@@ -116,7 +132,6 @@ class Job():
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
- self._skipped_flag = False # Indicate whether the job was skipped.
self._terminated = False # Whether this job has been explicitly terminated
# If False, a retry will not be attempted regardless of whether _tries is less than _max_retries.
@@ -273,18 +288,6 @@ class Job():
def set_task_id(self, task_id):
self._task_id = task_id
- # skipped
- #
- # This will evaluate to True if the job was skipped
- # during processing, or if it was forcefully terminated.
- #
- # Returns:
- # (bool): Whether the job should appear as skipped
- #
- @property
- def skipped(self):
- return self._skipped_flag or self._terminated
-
#######################################################
# Abstract Methods #
#######################################################
@@ -295,10 +298,10 @@ class Job():
# pass the result to the main thread.
#
# Args:
- # success (bool): Whether the job was successful.
+ # status (JobStatus): The job exit status
# result (any): The result returned by child_process().
#
- def parent_complete(self, success, result):
+ def parent_complete(self, status, result):
raise ImplError("Job '{kind}' does not implement parent_complete()"
.format(kind=type(self).__name__))
@@ -562,16 +565,23 @@ class Job():
#
self._retry_flag = returncode == RC_FAIL
- # Set the flag to alert Queue that this job skipped.
- self._skipped_flag = returncode == RC_SKIPPED
-
if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated:
self.spawn()
return
- success = returncode in (RC_OK, RC_SKIPPED)
- self.parent_complete(success, self._result)
- self._scheduler.job_completed(self, success)
+ # Resolve the outward facing overall job completion status
+ #
+ if returncode == RC_OK:
+ status = JobStatus.OK
+ elif returncode == RC_SKIPPED:
+ status = JobStatus.SKIPPED
+ elif returncode in (RC_FAIL, RC_PERM_FAIL):
+ status = JobStatus.FAIL
+ else:
+ status = JobStatus.FAIL
+
+ self.parent_complete(status, self._result)
+ self._scheduler.job_completed(self, status)
# Force the deletion of the queue and process objects to try and clean up FDs
self._queue = self._process = None
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index d05327557..9d8e7182b 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -21,7 +21,7 @@
from datetime import timedelta
from . import Queue, QueueStatus
-from ..jobs import ElementJob
+from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
from ..._message import MessageType
@@ -104,7 +104,7 @@ class BuildQueue(Queue):
if artifacts.has_quota_exceeded():
self._scheduler.check_cache_size()
- def done(self, job, element, result, success):
+ def done(self, job, element, result, status):
# Inform element in main process that assembly is done
element._assemble_done()
@@ -117,5 +117,5 @@ class BuildQueue(Queue):
# artifact cache size for a successful build even though we know a
# failed build also grows the artifact cache size.
#
- if success:
+ if status == JobStatus.OK:
self._check_cache_size(job, element, result)
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index c58bfdb57..fc11fd1d1 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -24,6 +24,7 @@ from ... import Consistency
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
# A queue which fetches element sources
@@ -66,9 +67,9 @@ class FetchQueue(Queue):
return QueueStatus.READY
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return
element._update_state()
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b861373a9..dbeb806e5 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -21,6 +21,7 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -54,9 +55,9 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return
element._pull_done()
@@ -64,4 +65,5 @@ class PullQueue(Queue):
# Build jobs will check the "approximate" size first. Since we
# do not get an artifact size from pull jobs, we have to
# actually check the cache size.
- self._scheduler.check_cache_size()
+ if status == JobStatus.OK:
+ self._scheduler.check_cache_size()
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 055e2f84b..81760ace4 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ from enum import Enum
import traceback
# Local imports
-from ..jobs import ElementJob
+from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
@@ -133,10 +133,9 @@ class Queue():
# job (Job): The job which completed processing
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
- # success (bool): True if the process() implementation did not
- # raise any exception
+ # status (JobStatus): The return status of the Job
#
- def done(self, job, element, result, success):
+ def done(self, job, element, result, status):
pass
#####################################################
@@ -291,7 +290,7 @@ class Queue():
#
# See the Job object for an explanation of the call signature
#
- def _job_done(self, job, element, success, result):
+ def _job_done(self, job, element, status, result):
# Update values that need to be synchronized in the main task
# before calling any queue implementation
@@ -301,7 +300,7 @@ class Queue():
# and determine if it should be considered as processed
# or skipped.
try:
- self.done(job, element, result, success)
+ self.done(job, element, result, status)
except BstError as e:
# Report error and mark as failed
@@ -332,12 +331,10 @@ class Queue():
# All jobs get placed on the done queue for later processing.
self._done_queue.append(job)
- # A Job can be skipped whether or not it has failed,
- # we want to only bookkeep them as processed or failed
- # if they are not skipped.
- if job.skipped:
+ # These lists are for bookkeeping purposes for the UI and logging.
+ if status == JobStatus.SKIPPED:
self.skipped_elements.append(element)
- elif success:
+ elif status == JobStatus.OK:
self.processed_elements.append(element)
else:
self.failed_elements.append(element)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 514418086..245b528e6 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -24,6 +24,7 @@ from ...plugin import _plugin_lookup
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
+from ..jobs import JobStatus
# A queue which tracks sources
@@ -47,9 +48,9 @@ class TrackQueue(Queue):
return QueueStatus.READY
- def done(self, _, element, result, success):
+ def done(self, _, element, result, status):
- if not success:
+ if status == JobStatus.FAIL:
return
# Set the new refs in the main process one by one as they complete
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c7308e..97335de34 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -38,6 +38,16 @@ class SchedStatus():
TERMINATED = 1
+# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
+# which we launch dynamically, they have the property of being
+# meaningless to queue if one is already queued, and it also
+# doesnt make sense to run them in parallel
+#
+_ACTION_NAME_CLEANUP = 'cleanup'
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
+_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -94,6 +104,15 @@ class Scheduler():
self._suspendtime = None
self._queue_jobs = True # Whether we should continue to queue jobs
+ # Whether our exclusive jobs, like 'cleanup' are currently already
+ # waiting or active.
+ #
+ # This is just a bit quicker than scanning the wait queue and active
+ # queue and comparing job action names.
+ #
+ self._exclusive_waiting = set()
+ self._exclusive_active = set()
+
self._resources = Resources(context.sched_builders,
context.sched_fetchers,
context.sched_pushers)
@@ -211,19 +230,6 @@ class Scheduler():
starttime = timenow
return timenow - starttime
- # schedule_jobs()
- #
- # Args:
- # jobs ([Job]): A list of jobs to schedule
- #
- # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
- # run as soon any other queueing jobs finish, provided sufficient
- # resources are available for them to run
- #
- def schedule_jobs(self, jobs):
- for job in jobs:
- self.waiting_jobs.append(job)
-
# job_completed():
#
# Called when a Job completes
@@ -231,12 +237,14 @@ class Scheduler():
# Args:
# queue (Queue): The Queue holding a complete job
# job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
+ # status (JobStatus): The status of the completed job
#
- def job_completed(self, job, success):
+ def job_completed(self, job, status):
self._resources.clear_job_resources(job)
self.active_jobs.remove(job)
- self._job_complete_callback(job, success)
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+ self._exclusive_active.remove(job.action_name)
+ self._job_complete_callback(job, status)
self._schedule_queue_jobs()
self._sched()
@@ -246,18 +254,13 @@ class Scheduler():
# size is calculated, a cleanup job will be run automatically
# if needed.
#
- # FIXME: This should ensure that only one cache size job
- # is ever pending at a given time. If a cache size
- # job is already running, it is correct to queue
- # a new one, it is incorrect to have more than one
- # of these jobs pending at a given time, though.
- #
def check_cache_size(self):
- job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
+ job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
+ 'cache_size/cache_size',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
complete_cb=self._run_cleanup)
- self.schedule_jobs([job])
+ self._schedule_jobs([job])
#######################################################
# Local Private Methods #
@@ -276,10 +279,19 @@ class Scheduler():
if not self._resources.reserve_job_resources(job):
continue
+ # Postpone these jobs if one is already running
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
+ job.action_name in self._exclusive_active:
+ continue
+
job.spawn()
self.waiting_jobs.remove(job)
self.active_jobs.append(job)
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+ self._exclusive_waiting.remove(job.action_name)
+ self._exclusive_active.add(job.action_name)
+
if self._job_start_callback:
self._job_start_callback(job)
@@ -287,6 +299,33 @@ class Scheduler():
if not self.active_jobs and not self.waiting_jobs:
self.loop.stop()
+ # _schedule_jobs()
+ #
+ # The main entry point for jobs to be scheduled.
+ #
+ # This is called either as a result of scanning the queues
+ # in _schedule_queue_jobs(), or directly by the Scheduler
+ # to insert special jobs like cleanups.
+ #
+ # Args:
+ # jobs ([Job]): A list of jobs to schedule
+ #
+ def _schedule_jobs(self, jobs):
+ for job in jobs:
+
+ # Special treatment of our redundant exclusive jobs
+ #
+ if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+
+ # Drop the job if one is already queued
+ if job.action_name in self._exclusive_waiting:
+ continue
+
+ # Mark this action type as queued
+ self._exclusive_waiting.add(job.action_name)
+
+ self.waiting_jobs.append(job)
+
# _schedule_queue_jobs()
#
# Ask the queues what jobs they want to schedule and schedule
@@ -331,7 +370,7 @@ class Scheduler():
# the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- self.schedule_jobs(ready)
+ self._schedule_jobs(ready)
self._sched()
# _run_cleanup()
@@ -353,11 +392,11 @@ class Scheduler():
if not artifacts.has_quota_exceeded():
return
- job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
+ job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
resources=[ResourceType.CACHE,
ResourceType.PROCESS],
exclusive_resources=[ResourceType.CACHE])
- self.schedule_jobs([job])
+ self._schedule_jobs([job])
# _suspend_jobs()
#