diff options
author | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2017-10-30 20:04:23 +0900 |
---|---|---|
committer | Tristan Van Berkom <tristan.vanberkom@codethink.co.uk> | 2017-10-30 20:04:23 +0900 |
commit | 051dd9ccbdd9dccc7bd4d20ab88ff96f0891e9fd (patch) | |
tree | 2ea0ba48427e00d964dbce8407e3b9338801fcd1 | |
parent | e472f5abd7f9aeff994a14e581c7a89199bd255f (diff) | |
download | buildstream-051dd9ccbdd9dccc7bd4d20ab88ff96f0891e9fd.tar.gz |
_pipeline.py: The toplevel message() function no longer takes an element.
Instead, use None as the unique ID.
This will help with messaging when we allow invoking multiple targets
and the element would have been ambiguous - this also consequently
fixes issue #137.
The reason for the hangs with #137 is because:
o When you --except a base element, reverse dependencies cannot calculate a cache key
o When you track, cache keys are intentionally reset at startup time, because
we know they are going to change
o At the end of tracking, we make one attempt to print the toplevel cache key
This operation is insanely expensive, because we never cache a cache key because
it logically cannot ever be resolved in this situation.
This fix is basically a workaround to the above.
-rw-r--r-- | buildstream/_pipeline.py | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index eecb88f6a..dd368d8fd 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -181,7 +181,7 @@ class Pipeline(): remote_ticker(self.artifacts.artifact_pull) self.artifacts.fetch_remote_refs() except _ArtifactError: - self.message(self.target, MessageType.WARN, "Failed to fetch remote refs") + self.message(MessageType.WARN, "Failed to fetch remote refs") self.artifacts.set_offline() for element in self.dependencies(Scope.ALL): @@ -227,7 +227,7 @@ class Pipeline(): "Try tracking these elements first with `bst track`\n\n" for element in inconsistent: detail += " " + element.name + "\n" - self.message(self.target, MessageType.ERROR, "Inconsistent pipeline", detail=detail) + self.message(MessageType.ERROR, "Inconsistent pipeline", detail=detail) raise PipelineError() # Generator function to iterate over only the elements @@ -241,13 +241,10 @@ class Pipeline(): # Local message propagator # - def message(self, plugin, message_type, message, **kwargs): + def message(self, message_type, message, **kwargs): args = dict(kwargs) self.context._message( - Message(plugin._get_unique_id(), - message_type, - message, - **args)) + Message(None, message_type, message, **args)) # Internal: Instantiates plugin-provided Element and Source instances # from MetaElement and MetaSource objects @@ -292,14 +289,14 @@ class Pipeline(): def can_push_remote_artifact_cache(self): if self.artifacts.can_push(): starttime = datetime.datetime.now() - self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache") + self.message(MessageType.START, "Checking connectivity to remote artifact cache") try: self.artifacts.preflight() except _ArtifactError as e: - self.message(self.target, MessageType.WARN, str(e), + self.message(MessageType.WARN, str(e), elapsed=datetime.datetime.now() - starttime) return False - self.message(self.target, MessageType.SUCCESS, "Connectivity OK", + self.message(MessageType.SUCCESS, "Connectivity OK", elapsed=datetime.datetime.now() - starttime) return True else: @@ -329,20 +326,20 @@ class Pipeline(): track.enqueue(dependencies) self.session_elements = len(dependencies) - self.message(self.target, MessageType.START, "Starting track") + self.message(MessageType.START, "Starting track") elapsed, status = scheduler.run([track]) changed = len(track.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Track failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Track failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after updating {} source references".format(changed), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Updated {} source references".format(changed), elapsed=elapsed) @@ -381,20 +378,20 @@ class Pipeline(): fetch.enqueue(plan) queues = [fetch] - self.message(self.target, MessageType.START, "Fetching {} elements".format(len(plan))) + self.message(MessageType.START, "Fetching {} elements".format(len(plan))) elapsed, status = scheduler.run(queues) fetched = len(fetch.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Fetch failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Fetch failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after fetching {} elements".format(fetched), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Fetched {} elements".format(fetched), elapsed=elapsed) @@ -410,7 +407,7 @@ class Pipeline(): # def build(self, scheduler, build_all, track_first): if len(self.unused_workspaces) > 0: - self.message(self.target, MessageType.WARN, "Unused workspaces", + self.message(MessageType.WARN, "Unused workspaces", detail="\n".join([el + "-" + str(src) for el, src, _ in self.unused_workspaces])) @@ -445,18 +442,18 @@ class Pipeline(): self.session_elements = len(plan) - self.message(self.target, MessageType.START, "Starting build") + self.message(MessageType.START, "Starting build") elapsed, status = scheduler.run(queues) built = len(build.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Build failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Build failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, "Terminated", elapsed=elapsed) + self.message(MessageType.WARN, "Terminated", elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, "Build Complete", elapsed=elapsed) + self.message(MessageType.SUCCESS, "Build Complete", elapsed=elapsed) # checkout() # @@ -484,7 +481,7 @@ class Pipeline(): # commands for cross-build artifacts. can_integrate = (self.context.host_arch == self.context.target_arch) if not can_integrate: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Host-incompatible checkout -- no integration commands can be run") # Stage deps into a temporary sandbox first @@ -548,15 +545,15 @@ class Pipeline(): fetched = len(fetch.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Tracking failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Tracking failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after fetching {} elements".format(fetched), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Fetched {} elements".format(fetched), elapsed=elapsed) if not no_checkout: @@ -657,20 +654,20 @@ class Pipeline(): pull.enqueue(plan) queues = [pull] - self.message(self.target, MessageType.START, "Pulling {} artifacts".format(len(plan))) + self.message(MessageType.START, "Pulling {} artifacts".format(len(plan))) elapsed, status = scheduler.run(queues) pulled = len(pull.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Pull failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Pull failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after pulling {} elements".format(pulled), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Pulled {} complete".format(pulled), elapsed=elapsed) @@ -695,20 +692,20 @@ class Pipeline(): push.enqueue(plan) queues = [push] - self.message(self.target, MessageType.START, "Pushing {} artifacts".format(len(plan))) + self.message(MessageType.START, "Pushing {} artifacts".format(len(plan))) elapsed, status = scheduler.run(queues) pushed = len(push.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Push failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Push failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after pushing {} elements".format(pushed), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Pushed {} complete".format(pushed), elapsed=elapsed) |