summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py28
1 files changed, 16 insertions, 12 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index cb1e84f74..e9bd60244 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -307,7 +307,7 @@ class Stream:
# Enqueue elements
self._enqueue_plan(elements)
- self._run()
+ self._run(announce_session=True)
# fetch()
#
@@ -334,7 +334,7 @@ class Stream:
)
# Delegated to a shared fetch method
- self._fetch(elements)
+ self._fetch(elements, announce_session=True)
# track()
#
@@ -365,7 +365,7 @@ class Stream:
track_queue = TrackQueue(self._scheduler)
self._add_queue(track_queue, track=True)
self._enqueue_plan(elements, queue=track_queue)
- self._run()
+ self._run(announce_session=True)
# source_push()
#
@@ -407,7 +407,7 @@ class Stream:
self._add_queue(SourcePushQueue(self._scheduler))
self._enqueue_plan(elements)
- self._run()
+ self._run(announce_session=True)
# pull()
#
@@ -444,7 +444,7 @@ class Stream:
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
- self._run()
+ self._run(announce_session=True)
# push()
#
@@ -510,7 +510,7 @@ class Stream:
push_queue = ArtifactPushQueue(self._scheduler)
self._add_queue(push_queue)
self._enqueue_plan(cached_elements, queue=push_queue)
- self._run()
+ self._run(announce_session=True)
# If the user has selected to continue on error, fail the command
# and print a summary of artifacts which could not be pushed
@@ -577,7 +577,7 @@ class Stream:
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(uncached_elts)
- self._run()
+ self._run(announce_session=True)
try:
scope = {
@@ -1354,14 +1354,17 @@ class Stream:
#
# Common function for running the scheduler
#
- def _run(self):
+ # Args:
+ # announce_session (bool): Whether to announce the session in the frontend.
+ #
+ def _run(self, *, announce_session: bool = False):
# Inform the frontend of the full list of elements
# and the list of elements which will be processed in this run
#
self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
- if self._session_start_callback is not None:
+ if announce_session and self._session_start_callback is not None:
self._session_start_callback()
self._running = True
@@ -1380,9 +1383,10 @@ class Stream:
#
# Args:
# elements (list of Element): Elements to fetch
- # fetch_original (Bool): Whether to fetch original unstaged
+ # fetch_original (bool): Whether to fetch original unstaged
+ # announce_session (bool): Whether to announce the session in the frontend
#
- def _fetch(self, elements, *, fetch_original=False):
+ def _fetch(self, elements: List[Element], *, fetch_original: bool = False, announce_session: bool = False):
# Assert consistency for the fetch elements
self._pipeline.assert_consistent(elements)
@@ -1392,7 +1396,7 @@ class Stream:
self._scheduler.clear_queues()
self._add_queue(FetchQueue(self._scheduler, fetch_original=fetch_original))
self._enqueue_plan(elements)
- self._run()
+ self._run(announce_session=announce_session)
# _check_location_writable()
#