diff options
author | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2018-09-20 09:41:50 +0000 |
---|---|---|
committer | Tristan Van Berkom <tristan.van.berkom@gmail.com> | 2018-09-20 09:41:50 +0000 |
commit | c6322a4165ed04dcc7634ca28900c7d149b305d8 (patch) | |
tree | 6328c176a38813413ed50fcab33e6aeedec4d313 | |
parent | c7e5d8be081314e4ff8414677d163d1a961b0d05 (diff) | |
parent | 283ff3fcc1123a48d8143adff105265a7f40cac2 (diff) | |
download | buildstream-c6322a4165ed04dcc7634ca28900c7d149b305d8.tar.gz |
Merge branch 'Qinusty/skipped-rework-backport-1.2' into 'bst-1.2'
Backport skipped reword (!765)
See merge request BuildStream/buildstream!810
-rw-r--r-- | buildstream/_artifactcache/cascache.py | 21 | ||||
-rw-r--r-- | buildstream/_exceptions.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/jobs/job.py | 30 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/buildqueue.py | 2 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/fetchqueue.py | 4 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pullqueue.py | 8 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/pushqueue.py | 14 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/queue.py | 9 | ||||
-rw-r--r-- | buildstream/_scheduler/queues/trackqueue.py | 11 | ||||
-rw-r--r-- | buildstream/element.py | 17 | ||||
-rw-r--r-- | tests/frontend/pull.py | 3 | ||||
-rw-r--r-- | tests/frontend/push.py | 23 | ||||
-rw-r--r-- | tests/testutils/runcli.py | 2 |
13 files changed, 88 insertions, 65 deletions
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py index 4ef8723a3..b6b1d436d 100644 --- a/buildstream/_artifactcache/cascache.py +++ b/buildstream/_artifactcache/cascache.py @@ -225,8 +225,8 @@ class CASCache(ArtifactCache): for remote in self._remotes[project]: try: remote.init() - - element.info("Pulling {} <- {}".format(element._get_brief_display_key(), remote.spec.url)) + display_key = element._get_brief_display_key() + element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url)) request = buildstream_pb2.GetReferenceRequest() request.key = ref @@ -240,6 +240,7 @@ class CASCache(ArtifactCache): self.set_ref(ref, tree) + element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url)) # no need to pull from additional remotes return True @@ -248,11 +249,8 @@ class CASCache(ArtifactCache): raise ArtifactError("Failed to pull artifact {}: {}".format( element._get_brief_display_key(), e)) from e else: - self.context.message(Message( - None, - MessageType.SKIPPED, - "Remote ({}) does not have {} cached".format( - remote.spec.url, element._get_brief_display_key()) + element.info("Remote ({}) does not have {} cached".format( + remote.spec.url, element._get_brief_display_key() )) return False @@ -273,11 +271,11 @@ class CASCache(ArtifactCache): push_remotes = [r for r in self._remotes[project] if r.spec.push] pushed = False - + display_key = element._get_brief_display_key() for remote in push_remotes: remote.init() skipped_remote = True - element.info("Pushing {} -> {}".format(element._get_brief_display_key(), remote.spec.url)) + element.status("Pushing artifact {} -> {}".format(display_key, remote.spec.url)) try: for ref in refs: @@ -354,6 +352,9 @@ class CASCache(ArtifactCache): pushed = True + if not skipped_remote: + element.info("Pushed artifact {} -> {}".format(display_key, remote.spec.url)) + except grpc.RpcError as e: if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: raise ArtifactError("Failed to push artifact {}: {}".format(refs, e), temporary=True) from e @@ -361,7 +362,7 @@ class CASCache(ArtifactCache): if skipped_remote: self.context.message(Message( None, - MessageType.SKIPPED, + MessageType.INFO, "Remote ({}) already has {} cached".format( remote.spec.url, element._get_brief_display_key()) )) diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py index fa595852b..d4ab1ea52 100644 --- a/buildstream/_exceptions.py +++ b/buildstream/_exceptions.py @@ -311,3 +311,12 @@ class StreamError(BstError): class AppError(BstError): def __init__(self, message, detail=None, reason=None): super().__init__(message, detail=detail, domain=ErrorDomain.APP, reason=reason) + + +# SkipJob +# +# Raised from a child process within a job when the job should be +# considered skipped by the parent process. +# +class SkipJob(Exception): + pass diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py index 165c7c83f..d77fa0c82 100644 --- a/buildstream/_scheduler/jobs/job.py +++ b/buildstream/_scheduler/jobs/job.py @@ -31,7 +31,7 @@ import multiprocessing import psutil # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error +from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob from ..._message import Message, MessageType, unconditional_messages from ... import _signals, utils @@ -40,6 +40,7 @@ from ... import _signals, utils RC_OK = 0 RC_FAIL = 1 RC_PERM_FAIL = 2 +RC_SKIPPED = 3 # Used to distinguish between status messages and return values @@ -117,7 +118,7 @@ class Job(): self._max_retries = max_retries # Maximum number of automatic retries self._result = None # Return value of child action in the parent self._tries = 0 # Try count, for retryable jobs - + self._skipped_flag = False # Indicate whether the job was skipped. # If False, a retry will not be attempted regardless of whether _tries is less than _max_retries. # self._retry_flag = True @@ -275,6 +276,14 @@ class Job(): def set_task_id(self, task_id): self._task_id = task_id + # skipped + # + # Returns: + # bool: True if the job was skipped while processing. + @property + def skipped(self): + return self._skipped_flag + ####################################################### # Abstract Methods # ####################################################### @@ -396,6 +405,13 @@ class Job(): try: # Try the task action result = self.child_process() + except SkipJob as e: + elapsed = datetime.datetime.now() - starttime + self.message(MessageType.SKIPPED, str(e), + elapsed=elapsed, logfile=filename) + + # Alert parent of skip by return code + self._child_shutdown(RC_SKIPPED) except BstError as e: elapsed = datetime.datetime.now() - starttime self._retry_flag = e.temporary @@ -543,14 +559,18 @@ class Job(): # We don't want to retry if we got OK or a permanent fail. # This is set in _child_action but must also be set for the parent. # - self._retry_flag = returncode not in (RC_OK, RC_PERM_FAIL) + self._retry_flag = returncode == RC_FAIL + + # Set the flag to alert Queue that this job skipped. + self._skipped_flag = returncode == RC_SKIPPED if self._retry_flag and (self._tries <= self._max_retries) and not self._scheduler.terminated: self.spawn() return - self.parent_complete(returncode == RC_OK, self._result) - self._scheduler.job_completed(self, returncode == RC_OK) + success = returncode in (RC_OK, RC_SKIPPED) + self.parent_complete(success, self._result) + self._scheduler.job_completed(self, success) # _parent_process_envelope() # diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py index d3d2fad3e..90e3ad792 100644 --- a/buildstream/_scheduler/queues/buildqueue.py +++ b/buildstream/_scheduler/queues/buildqueue.py @@ -77,5 +77,3 @@ class BuildQueue(Queue): # This has to be done after _assemble_done, such that the # element may register its cache key as required self._check_cache_size(job, element, result) - - return True diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py index 265890b7a..114790c05 100644 --- a/buildstream/_scheduler/queues/fetchqueue.py +++ b/buildstream/_scheduler/queues/fetchqueue.py @@ -70,11 +70,9 @@ class FetchQueue(Queue): def done(self, _, element, result, success): if not success: - return False + return element._update_state() # Successful fetch, we must be CACHED now assert element._get_consistency() == Consistency.CACHED - - return True diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py index e18967cf4..2842c5e21 100644 --- a/buildstream/_scheduler/queues/pullqueue.py +++ b/buildstream/_scheduler/queues/pullqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pulls element artifacts @@ -33,7 +34,8 @@ class PullQueue(Queue): def process(self, element): # returns whether an artifact was downloaded or not - return element._pull() + if not element._pull(): + raise SkipJob(self.action_name) def status(self, element): # state of dependencies may have changed, recalculate element state @@ -63,7 +65,3 @@ class PullQueue(Queue): # do not get an artifact size from pull jobs, we have to # actually check the cache size. self._scheduler.check_cache_size() - - # Element._pull() returns True if it downloaded an artifact, - # here we want to appear skipped if we did not download. - return result diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py index 568e053d6..35532d23d 100644 --- a/buildstream/_scheduler/queues/pushqueue.py +++ b/buildstream/_scheduler/queues/pushqueue.py @@ -21,6 +21,7 @@ # Local imports from . import Queue, QueueStatus from ..resources import ResourceType +from ..._exceptions import SkipJob # A queue which pushes element artifacts @@ -33,20 +34,11 @@ class PushQueue(Queue): def process(self, element): # returns whether an artifact was uploaded or not - return element._push() + if not element._push(): + raise SkipJob(self.action_name) def status(self, element): if element._skip_push(): return QueueStatus.SKIP return QueueStatus.READY - - def done(self, _, element, result, success): - - if not success: - return False - - # Element._push() returns True if it uploaded an artifact, - # here we want to appear skipped if the remote already had - # the artifact. - return result diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 2f875881f..15467ca67 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -136,10 +136,6 @@ class Queue(): # success (bool): True if the process() implementation did not # raise any exception # - # Returns: - # (bool): True if the element should appear to be processsed, - # Otherwise False will count the element as "skipped" - # def done(self, job, element, result, success): pass @@ -305,8 +301,7 @@ class Queue(): # and determine if it should be considered as processed # or skipped. try: - processed = self.done(job, element, result, success) - + self.done(job, element, result, success) except BstError as e: # Report error and mark as failed @@ -335,7 +330,7 @@ class Queue(): # if success: self._done_queue.append(job) - if processed: + if not job.skipped: self.processed_elements.append(element) else: self.skipped_elements.append(element) diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py index f443df3be..133655e14 100644 --- a/buildstream/_scheduler/queues/trackqueue.py +++ b/buildstream/_scheduler/queues/trackqueue.py @@ -51,18 +51,11 @@ class TrackQueue(Queue): def done(self, _, element, result, success): if not success: - return False - - changed = False + return # Set the new refs in the main process one by one as they complete for unique_id, new_ref in result: source = _plugin_lookup(unique_id) - # We appear processed if at least one source has changed - if source._save_ref(new_ref): - changed = True + source._save_ref(new_ref) element._tracking_done() - - # We'll appear as a skipped element if tracking resulted in no change - return changed diff --git a/buildstream/element.py b/buildstream/element.py index 6b0a728e6..260762cf1 100644 --- a/buildstream/element.py +++ b/buildstream/element.py @@ -1672,8 +1672,6 @@ class Element(Plugin): return False # Notify successfull download - display_key = self._get_brief_display_key() - self.info("Downloaded artifact {}".format(display_key)) return True # _skip_push(): @@ -1712,16 +1710,13 @@ class Element(Plugin): self.warn("Not pushing tainted artifact.") return False - display_key = self._get_brief_display_key() - with self.timed_activity("Pushing artifact {}".format(display_key)): - # Push all keys used for local commit - pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit()) - if not pushed: - return False + # Push all keys used for local commit + pushed = self.__artifacts.push(self, self.__get_cache_keys_for_commit()) + if not pushed: + return False - # Notify successful upload - self.info("Pushed artifact {}".format(display_key)) - return True + # Notify successful upload + return True # _shell(): # diff --git a/tests/frontend/pull.py b/tests/frontend/pull.py index ed9a9643e..c883e2030 100644 --- a/tests/frontend/pull.py +++ b/tests/frontend/pull.py @@ -356,4 +356,5 @@ def test_pull_missing_notifies_user(caplog, cli, tmpdir, datafiles): assert not result.get_pulled_elements(), \ "No elements should have been pulled since the cache was empty" - assert "SKIPPED Remote ({}) does not have".format(share.repo) in result.stderr + assert "INFO Remote ({}) does not have".format(share.repo) in result.stderr + assert "SKIPPED Pull" in result.stderr diff --git a/tests/frontend/push.py b/tests/frontend/push.py index 6ee301e6d..61259c521 100644 --- a/tests/frontend/push.py +++ b/tests/frontend/push.py @@ -362,3 +362,26 @@ def test_push_cross_junction(cli, tmpdir, datafiles): cache_key = cli.get_element_key(project, 'junction.bst:import-etc.bst') assert share.has_artifact('subtest', 'import-etc.bst', cache_key) + + +@pytest.mark.datafiles(DATA_DIR) +def test_push_already_cached(caplog, cli, tmpdir, datafiles): + project = os.path.join(datafiles.dirname, datafiles.basename) + caplog.set_level(1) + + with create_artifact_share(os.path.join(str(tmpdir), 'artifactshare')) as share: + + cli.configure({ + 'artifacts': {'url': share.repo, 'push': True} + }) + result = cli.run(project=project, args=['build', 'target.bst']) + + result.assert_success() + assert "SKIPPED Push" not in result.stderr + + result = cli.run(project=project, args=['push', 'target.bst']) + + result.assert_success() + assert not result.get_pushed_elements(), "No elements should have been pushed since the cache was populated" + assert "INFO Remote ({}) already has ".format(share.repo) in result.stderr + assert "SKIPPED Push" in result.stderr diff --git a/tests/testutils/runcli.py b/tests/testutils/runcli.py index 8cd5bcb75..3535e94ea 100644 --- a/tests/testutils/runcli.py +++ b/tests/testutils/runcli.py @@ -178,7 +178,7 @@ class Result(): return list(pushed) def get_pulled_elements(self): - pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Downloaded artifact', self.stderr) + pulled = re.findall(r'\[\s*pull:(\S+)\s*\]\s*INFO\s*Pulled artifact', self.stderr) if pulled is None: return [] |