diff options
-rw-r--r-- | buildstream/_pipeline.py | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py index eecb88f6a..dd368d8fd 100644 --- a/buildstream/_pipeline.py +++ b/buildstream/_pipeline.py @@ -181,7 +181,7 @@ class Pipeline(): remote_ticker(self.artifacts.artifact_pull) self.artifacts.fetch_remote_refs() except _ArtifactError: - self.message(self.target, MessageType.WARN, "Failed to fetch remote refs") + self.message(MessageType.WARN, "Failed to fetch remote refs") self.artifacts.set_offline() for element in self.dependencies(Scope.ALL): @@ -227,7 +227,7 @@ class Pipeline(): "Try tracking these elements first with `bst track`\n\n" for element in inconsistent: detail += " " + element.name + "\n" - self.message(self.target, MessageType.ERROR, "Inconsistent pipeline", detail=detail) + self.message(MessageType.ERROR, "Inconsistent pipeline", detail=detail) raise PipelineError() # Generator function to iterate over only the elements @@ -241,13 +241,10 @@ class Pipeline(): # Local message propagator # - def message(self, plugin, message_type, message, **kwargs): + def message(self, message_type, message, **kwargs): args = dict(kwargs) self.context._message( - Message(plugin._get_unique_id(), - message_type, - message, - **args)) + Message(None, message_type, message, **args)) # Internal: Instantiates plugin-provided Element and Source instances # from MetaElement and MetaSource objects @@ -292,14 +289,14 @@ class Pipeline(): def can_push_remote_artifact_cache(self): if self.artifacts.can_push(): starttime = datetime.datetime.now() - self.message(self.target, MessageType.START, "Checking connectivity to remote artifact cache") + self.message(MessageType.START, "Checking connectivity to remote artifact cache") try: self.artifacts.preflight() except _ArtifactError as e: - self.message(self.target, MessageType.WARN, str(e), + self.message(MessageType.WARN, str(e), elapsed=datetime.datetime.now() - starttime) return False - self.message(self.target, MessageType.SUCCESS, "Connectivity OK", + self.message(MessageType.SUCCESS, "Connectivity OK", elapsed=datetime.datetime.now() - starttime) return True else: @@ -329,20 +326,20 @@ class Pipeline(): track.enqueue(dependencies) self.session_elements = len(dependencies) - self.message(self.target, MessageType.START, "Starting track") + self.message(MessageType.START, "Starting track") elapsed, status = scheduler.run([track]) changed = len(track.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Track failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Track failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after updating {} source references".format(changed), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Updated {} source references".format(changed), elapsed=elapsed) @@ -381,20 +378,20 @@ class Pipeline(): fetch.enqueue(plan) queues = [fetch] - self.message(self.target, MessageType.START, "Fetching {} elements".format(len(plan))) + self.message(MessageType.START, "Fetching {} elements".format(len(plan))) elapsed, status = scheduler.run(queues) fetched = len(fetch.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Fetch failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Fetch failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after fetching {} elements".format(fetched), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Fetched {} elements".format(fetched), elapsed=elapsed) @@ -410,7 +407,7 @@ class Pipeline(): # def build(self, scheduler, build_all, track_first): if len(self.unused_workspaces) > 0: - self.message(self.target, MessageType.WARN, "Unused workspaces", + self.message(MessageType.WARN, "Unused workspaces", detail="\n".join([el + "-" + str(src) for el, src, _ in self.unused_workspaces])) @@ -445,18 +442,18 @@ class Pipeline(): self.session_elements = len(plan) - self.message(self.target, MessageType.START, "Starting build") + self.message(MessageType.START, "Starting build") elapsed, status = scheduler.run(queues) built = len(build.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Build failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Build failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, "Terminated", elapsed=elapsed) + self.message(MessageType.WARN, "Terminated", elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, "Build Complete", elapsed=elapsed) + self.message(MessageType.SUCCESS, "Build Complete", elapsed=elapsed) # checkout() # @@ -484,7 +481,7 @@ class Pipeline(): # commands for cross-build artifacts. can_integrate = (self.context.host_arch == self.context.target_arch) if not can_integrate: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Host-incompatible checkout -- no integration commands can be run") # Stage deps into a temporary sandbox first @@ -548,15 +545,15 @@ class Pipeline(): fetched = len(fetch.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Tracking failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Tracking failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after fetching {} elements".format(fetched), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Fetched {} elements".format(fetched), elapsed=elapsed) if not no_checkout: @@ -657,20 +654,20 @@ class Pipeline(): pull.enqueue(plan) queues = [pull] - self.message(self.target, MessageType.START, "Pulling {} artifacts".format(len(plan))) + self.message(MessageType.START, "Pulling {} artifacts".format(len(plan))) elapsed, status = scheduler.run(queues) pulled = len(pull.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Pull failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Pull failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after pulling {} elements".format(pulled), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Pulled {} complete".format(pulled), elapsed=elapsed) @@ -695,20 +692,20 @@ class Pipeline(): push.enqueue(plan) queues = [push] - self.message(self.target, MessageType.START, "Pushing {} artifacts".format(len(plan))) + self.message(MessageType.START, "Pushing {} artifacts".format(len(plan))) elapsed, status = scheduler.run(queues) pushed = len(push.processed_elements) if status == SchedStatus.ERROR: - self.message(self.target, MessageType.FAIL, "Push failed", elapsed=elapsed) + self.message(MessageType.FAIL, "Push failed", elapsed=elapsed) raise PipelineError() elif status == SchedStatus.TERMINATED: - self.message(self.target, MessageType.WARN, + self.message(MessageType.WARN, "Terminated after pushing {} elements".format(pushed), elapsed=elapsed) raise PipelineError() else: - self.message(self.target, MessageType.SUCCESS, + self.message(MessageType.SUCCESS, "Pushed {} complete".format(pushed), elapsed=elapsed) |