diff options
author | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-09-27 14:51:53 +0100 |
---|---|---|
committer | Tom Pollard <tom.pollard@codethink.co.uk> | 2019-10-02 15:40:19 +0100 |
commit | 5cdf740edea883d302774d6c136d107df6dc4b14 (patch) | |
tree | 1ec42ab4adc49e6f5e23b9c8f67cf4963a743ce7 | |
parent | 1142484eca2df3ec966e5f6147554cb2a5346b1e (diff) | |
download | buildstream-5cdf740edea883d302774d6c136d107df6dc4b14.tar.gz |
Add len of session/total elements members to Stream
-rw-r--r-- | src/buildstream/_frontend/status.py | 4 | ||||
-rw-r--r-- | src/buildstream/_frontend/widget.py | 4 | ||||
-rw-r--r-- | src/buildstream/_scheduler/scheduler.py | 5 | ||||
-rw-r--r-- | src/buildstream/_stream.py | 19 |
4 files changed, 23 insertions, 9 deletions
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index a204bd9ac..578298df8 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -373,8 +373,8 @@ class _StatusHeader(): # # ========= 00:00:00 project-name (143/387) ========= # - session = str(len(self._stream.session_elements)) - total = str(len(self._stream.total_elements)) + session = self._stream.len_session_elements + total = self._stream.len_total_elements size = 0 text = '' diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 181ee7d2e..d40b87cd9 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -565,8 +565,8 @@ class LogLine(Widget): text += self.content_profile.fmt("Pipeline Summary\n", bold=True) values = OrderedDict() - values['Total'] = self.content_profile.fmt(str(len(stream.total_elements))) - values['Session'] = self.content_profile.fmt(str(len(stream.session_elements))) + values['Total'] = self.content_profile.fmt(stream.len_total_elements) + values['Session'] = self.content_profile.fmt(stream.len_session_elements) processed_maxlen = 1 skipped_maxlen = 1 diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 62c2754a6..bb3fac513 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -67,6 +67,7 @@ class NotificationType(FastEnum): EXCEPTION = "exception" START = "start" TASK_GROUPS = "task_groups" + ELEMENT_TOTALS = "element_totals" # Notification() @@ -90,7 +91,8 @@ class Notification(): message=None, task_error=None, exception=None, - task_groups=None): + task_groups=None, + element_totals=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -101,6 +103,7 @@ class Notification(): self.task_error = task_error # Tuple of domain & reason self.exception = exception self.task_groups = task_groups + self.element_totals = element_totals # Scheduler() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c0bd110df..d01605eb1 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -72,6 +72,8 @@ class Stream(): self.session_elements = [] # List of elements being processed this session self.total_elements = [] # Total list of elements based on targets self.queues = [] # Queue objects + self.len_session_elements = None + self.len_total_elements = None # # Private members @@ -82,7 +84,6 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state - #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe() self._subprocess = None self._starttime = session_start # Synchronised with Scheduler's relative start time @@ -127,13 +128,13 @@ class Stream(): mp_context = mp.get_context(method='fork') process_name = "stream-{}".format(func.__name__) - + self._notify_front = mp.Queue() self._notify_back = mp.Queue() # Tell the scheduler to not use the notifier callback self._scheduler._notify_front = self._notify_front self._scheduler._notify_back = self._notify_back - + args = list(args) args.insert(0, self._notify_front) args.insert(0, func) @@ -1444,6 +1445,14 @@ class Stream(): else: self._session_start_callback() + # Also send through the session & total elements list lengths for status rendering + element_totals = str(len(self.session_elements)), str(len(self.total_elements)) + if self._notify_front: + self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, + element_totals=element_totals)) + else: + self.len_session_elements, self.len_total_elements = element_totals + status = self._scheduler.run(self.queues) if status == SchedStatus.ERROR: @@ -1768,6 +1777,8 @@ class Stream(): raise SubprocessException(**notification.exception) elif notification.notification_type == NotificationType.START: self._session_start_callback() + elif notification.notification_type == NotificationType.ELEMENT_TOTALS: + self.len_session_elements, self.len_total_elements = notification.element_totals else: raise StreamError("Unrecognised notification type received") @@ -1789,7 +1800,7 @@ class Stream(): except queue.Empty: notification = None break - + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and |