summaryrefslogtreecommitdiff
path: root/src/buildstream/_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r--src/buildstream/_stream.py37
1 files changed, 25 insertions, 12 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index fcd40c3b4..2ad1a4fee 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
from ._pipeline import Pipeline, PipelineSelection
from ._profile import Topics, PROFILER
+from ._state import State
from .types import _KeyStrength
from . import utils, _yaml, _site
from . import Scope, Consistency
@@ -53,17 +54,13 @@ from . import Scope, Consistency
# session_start_callback (callable): A callback to invoke when the session starts
# interrupt_callback (callable): A callback to invoke when we get interrupted
# ticker_callback (callable): Invoked every second while running the scheduler
-# job_start_callback (callable): Called when a job starts
-# job_complete_callback (callable): Called when a job completes
#
class Stream():
def __init__(self, context, session_start, *,
session_start_callback=None,
interrupt_callback=None,
- ticker_callback=None,
- job_start_callback=None,
- job_complete_callback=None):
+ ticker_callback=None):
#
# Public members
@@ -76,19 +73,27 @@ class Stream():
#
# Private members
#
- self._artifacts = context.artifactcache
- self._sourcecache = context.sourcecache
self._context = context
+ self._artifacts = None
+ self._sourcecache = None
self._project = None
self._pipeline = None
- self._scheduler = Scheduler(context, session_start,
+ self._state = State() # Owned by Stream, used by Core to set state
+ self._scheduler = Scheduler(context, session_start, self._state,
interrupt_callback=interrupt_callback,
- ticker_callback=ticker_callback,
- job_start_callback=job_start_callback,
- job_complete_callback=job_complete_callback)
+ ticker_callback=ticker_callback)
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
+ # init()
+ #
+ # Initialization of Stream that has side-effects that require it to be
+ # performed after the Stream is created.
+ #
+ def init(self):
+ self._artifacts = self._context.artifactcache
+ self._sourcecache = self._context.sourcecache
+
# cleanup()
#
# Cleans up application state
@@ -972,6 +977,15 @@ class Stream():
self.queues = [queue]
self._run()
+ # get_state()
+ #
+ # Get the State object owned by Stream
+ #
+ # Returns:
+ # State: The State object
+ def get_state(self):
+ return self._state
+
#############################################################
# Scheduler API forwarding #
#############################################################
@@ -1213,7 +1227,6 @@ class Stream():
#
def _add_queue(self, queue, *, track=False):
self.queues.append(queue)
-
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
self._first_non_track_queue.set_required_element_check()