diff options
author | Josh Smith <qinusty@gmail.com> | 2018-09-07 15:36:43 +0100 |
---|---|---|
committer | Josh Smith <qinusty@gmail.com> | 2018-09-19 10:25:21 +0100 |
commit | ca811a4d5905b61862c230e17925ab9152a78b16 (patch) | |
tree | a9a647cff50c300f1634848a6061f94916c1f3ad | |
parent | 72b5902157316e173de2eec5b3a2772283eec3c7 (diff) | |
download | buildstream-ca811a4d5905b61862c230e17925ab9152a78b16.tar.gz |
Rework Skipped usage
The SKIPPED message type is now used to indicate the end of a task which
was successful without having to perform the given task.
This overhauls the use of `Queue.done()` and therefore queues do not
need to provide a processed/skipped return value from `done()`. Instead
this is replaced with the action of raising a `SkipJob` exception from
within `Queue.process()`.
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 4 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 30 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 3 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 11 | ||||
-rw-r--r-- | tests/frontend/pull.py | 3 |
10 files changed, 50 insertions, 45 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 9cf83a222..1a48c4065 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -253,7 +253,7 @@ class CASCache(ArtifactCache): else: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) does not have {} cached".format( remote.spec.url, element._get_brief_display_key()) )) @@ -344,7 +344,7 @@ class CASCache(ArtifactCache): else: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index 6fe4f4847..19606776e 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -312,3 +312,12 @@ class StreamError(BstError): class AppError(BstError): def __init__(self, message, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason) + + +# SkipJob +# +# Raised from a child process within a job when the job should be +# considered skipped by the parent process. +# +class SkipJob(Exception): + pass diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index c55219b58..1c6b4a582 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import multiprocessing import psutil # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -40,6 +40,7 @@ from ... import _signals, utils RC_OK = 0 RC_FAIL = 1 RC_PERM_FAIL = 2 +RC_SKIPPED = 3 # Used to distinguish between status messages and return values @@ -117,7 +118,7 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - + self._skipped_flag = False # Indicate whether the job was skipped. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -277,6 +278,14 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # skipped + # + # Returns: + # bool: True if the job was skipped while processing. + @property + def skipped(self): + return self._skipped_flag + ####################################################### # Abstract Methods # ####################################################### @@ -398,6 +407,13 @@ class Job(): try: # Try the task action result = self.child_process() + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime self._retry_flag = e.temporary @@ -545,14 +561,18 @@ class Job(): # We don't want to retry if we got OK or a permanent fail. # This is set in _child_action but must also be set for the parent. # - self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL) + self._retry_flag = returncode == RC_FAIL + + # Set the flag to alert Queue that this job skipped. + self._skipped_flag = returncode == RC_SKIPPED if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - self.parent_complete(returncode == RC_OK, self._result) - self._scheduler.job_completed(self, returncode == RC_OK) + success = returncode in (RC_OK, RC_SKIPPED) + self.parent_complete(success, self._result) + self._scheduler.job_completed(self, success) # Force the deletion of the queue and process objects to try and clean up FDs self._queue = self._process = None diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index 0c74b3698..39ed83a32 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -47,6 +47,7 @@ class BuildQueue(Queue): to_queue.append(element) continue + # XXX: Fix this, See https://mail.gnome.org/archives/buildstream-list/2018-September/msg00029.html # Bypass queue processing entirely the first time it's tried. self._tried.add(element) _, description, detail = element._get_build_result() @@ -113,5 +114,3 @@ class BuildQueue(Queue): # This has to be done after _assemble_done, such that the # element may register its cache key as required self._check_cache_size(job, element, result) - - return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index bd90a13b6..446dbbd3b 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -72,11 +72,9 @@ class FetchQueue(Queue): def done(self, _, element, result, success): if not success: - return False + return element._update_state() # Successful fetch, we must be CACHED now assert element._get_consistency() == Consistency.CACHED - - return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e18967cf4..2842c5e21 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pulls element artifacts @@ -33,7 +34,8 @@ class PullQueue(Queue): def process(self, element): # returns whether an artifact was downloaded or not - return element._pull() + if not element._pull(): + raise SkipJob(self.action_name) def status(self, element): # state of dependencies may have changed, recalculate element state @@ -63,7 +65,3 @@ class PullQueue(Queue): # do not get an artifact size from pull jobs, we have to # actually check the cache size. self._scheduler.check_cache_size() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 568e053d6..35532d23d 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pushes element artifacts @@ -33,20 +34,11 @@ class PushQueue(Queue): def process(self, element): # returns whether an artifact was uploaded or not - return element._push() + if not element._push(): + raise SkipJob(self.action_name) def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY - - def done(self, _, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 472e033da..f058663a1 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -136,10 +136,6 @@ class Queue(): # success (bool): True if the process() implementation did not # raise any exception # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # def done(self, job, element, result, success): pass @@ -306,8 +302,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(job, element, result, success) - + self.done(job, element, result, success) except BstError as e: # Report error and mark as failed @@ -337,7 +332,7 @@ class Queue(): self._done_queue.append(job) if success: - if processed: + if not job.skipped: self.processed_elements.append(element) else: self.skipped_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index f443df3be..133655e14 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -51,18 +51,11 @@ class TrackQueue(Queue): def done(self, _, element, result, success): if not success: - return False - - changed = False + return # Set the new refs in the main process one by one as they complete for unique_id, new_ref in result: source = _plugin_lookup(unique_id) - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True + source._save_ref(new_ref) element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py index ed9a9643e..c883e2030 100644 --- a/tests/frontend/pull.py +++ b/tests/frontend/pull.py @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): assert not result.get_pulled_elements(), \ "No elements should have been pulled since the cache was empty" - assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr + assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr + assert "SKIPPED Pull" in result.stderr |