diff options
Diffstat (limited to 'buildstream')
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 4 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 30 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 11 |
9 files changed, 47 insertions, 44 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4ef8723a3..7d090e228 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -250,7 +250,7 @@ class CASCache(ArtifactCache): else: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) does not have {} cached".format( remote.spec.url, element._get_brief_display_key()) )) @@ -361,7 +361,7 @@ class CASCache(ArtifactCache): if skipped_remote: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index fa595852b..d4ab1ea52 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -311,3 +311,12 @@ class StreamError(BstError): class AppError(BstError): def __init__(self, message, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason) + + +# SkipJob +# +# Raised from a child process within a job when the job should be +# considered skipped by the parent process. +# +class SkipJob(Exception): + pass diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 165c7c83f..d77fa0c82 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import multiprocessing import psutil # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -40,6 +40,7 @@ from ... import _signals, utils RC_OK = 0 RC_FAIL = 1 RC_PERM_FAIL = 2 +RC_SKIPPED = 3 # Used to distinguish between status messages and return values @@ -117,7 +118,7 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - + self._skipped_flag = False # Indicate whether the job was skipped. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -275,6 +276,14 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # skipped + # + # Returns: + # bool: True if the job was skipped while processing. + @property + def skipped(self): + return self._skipped_flag + ####################################################### # Abstract Methods # ####################################################### @@ -396,6 +405,13 @@ class Job(): try: # Try the task action result = self.child_process() + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime self._retry_flag = e.temporary @@ -543,14 +559,18 @@ class Job(): # We don't want to retry if we got OK or a permanent fail. # This is set in _child_action but must also be set for the parent. # - self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL) + self._retry_flag = returncode == RC_FAIL + + # Set the flag to alert Queue that this job skipped. + self._skipped_flag = returncode == RC_SKIPPED if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - self.parent_complete(returncode == RC_OK, self._result) - self._scheduler.job_completed(self, returncode == RC_OK) + success = returncode in (RC_OK, RC_SKIPPED) + self.parent_complete(success, self._result) + self._scheduler.job_completed(self, success) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index d3d2fad3e..90e3ad792 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -77,5 +77,3 @@ class BuildQueue(Queue): # This has to be done after _assemble_done, such that the # element may register its cache key as required self._check_cache_size(job, element, result) - - return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 265890b7a..114790c05 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -70,11 +70,9 @@ class FetchQueue(Queue): def done(self, _, element, result, success): if not success: - return False + return element._update_state() # Successful fetch, we must be CACHED now assert element._get_consistency() == Consistency.CACHED - - return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e18967cf4..2842c5e21 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pulls element artifacts @@ -33,7 +34,8 @@ class PullQueue(Queue): def process(self, element): # returns whether an artifact was downloaded or not - return element._pull() + if not element._pull(): + raise SkipJob(self.action_name) def status(self, element): # state of dependencies may have changed, recalculate element state @@ -63,7 +65,3 @@ class PullQueue(Queue): # do not get an artifact size from pull jobs, we have to # actually check the cache size. self._scheduler.check_cache_size() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 568e053d6..35532d23d 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pushes element artifacts @@ -33,20 +34,11 @@ class PushQueue(Queue): def process(self, element): # returns whether an artifact was uploaded or not - return element._push() + if not element._push(): + raise SkipJob(self.action_name) def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY - - def done(self, _, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 2f875881f..15467ca67 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -136,10 +136,6 @@ class Queue(): # success (bool): True if the process() implementation did not # raise any exception # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # def done(self, job, element, result, success): pass @@ -305,8 +301,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(job, element, result, success) - + self.done(job, element, result, success) except BstError as e: # Report error and mark as failed @@ -335,7 +330,7 @@ class Queue(): # if success: self._done_queue.append(job) - if processed: + if not job.skipped: self.processed_elements.append(element) else: self.skipped_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index f443df3be..133655e14 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -51,18 +51,11 @@ class TrackQueue(Queue): def done(self, _, element, result, success): if not success: - return False - - changed = False + return # Set the new refs in the main process one by one as they complete for unique_id, new_ref in result: source = _plugin_lookup(unique_id) - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True + source._save_ref(new_ref) element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed |