diff options
Diffstat (limited to 'src/buildstream/_stream.py')
-rw-r--r-- | src/buildstream/_stream.py | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index fcd40c3b4..2ad1a4fee 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -38,6 +38,7 @@ from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \ SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue from ._pipeline import Pipeline, PipelineSelection from ._profile import Topics, PROFILER +from ._state import State from .types import _KeyStrength from . import utils, _yaml, _site from . import Scope, Consistency @@ -53,17 +54,13 @@ from . import Scope, Consistency # session_start_callback (callable): A callback to invoke when the session starts # interrupt_callback (callable): A callback to invoke when we get interrupted # ticker_callback (callable): Invoked every second while running the scheduler -# job_start_callback (callable): Called when a job starts -# job_complete_callback (callable): Called when a job completes # class Stream(): def __init__(self, context, session_start, *, session_start_callback=None, interrupt_callback=None, - ticker_callback=None, - job_start_callback=None, - job_complete_callback=None): + ticker_callback=None): # # Public members @@ -76,19 +73,27 @@ class Stream(): # # Private members # - self._artifacts = context.artifactcache - self._sourcecache = context.sourcecache self._context = context + self._artifacts = None + self._sourcecache = None self._project = None self._pipeline = None - self._scheduler = Scheduler(context, session_start, + self._state = State() # Owned by Stream, used by Core to set state + self._scheduler = Scheduler(context, session_start, self._state, interrupt_callback=interrupt_callback, - ticker_callback=ticker_callback, - job_start_callback=job_start_callback, - job_complete_callback=job_complete_callback) + ticker_callback=ticker_callback) self._first_non_track_queue = None self._session_start_callback = session_start_callback + # init() + # + # Initialization of Stream that has side-effects that require it to be + # performed after the Stream is created. + # + def init(self): + self._artifacts = self._context.artifactcache + self._sourcecache = self._context.sourcecache + # cleanup() # # Cleans up application state @@ -972,6 +977,15 @@ class Stream(): self.queues = [queue] self._run() + # get_state() + # + # Get the State object owned by Stream + # + # Returns: + # State: The State object + def get_state(self): + return self._state + ############################################################# # Scheduler API forwarding # ############################################################# @@ -1213,7 +1227,6 @@ class Stream(): # def _add_queue(self, queue, *, track=False): self.queues.append(queue) - if not (track or self._first_non_track_queue): self._first_non_track_queue = queue self._first_non_track_queue.set_required_element_check() |