From 5cdf740edea883d302774d6c136d107df6dc4b14 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Fri, 27 Sep 2019 14:51:53 +0100 Subject: Add len of session/total elements members to Stream --- src/buildstream/_frontend/status.py | 4 ++-- src/buildstream/_frontend/widget.py | 4 ++-- src/buildstream/_scheduler/scheduler.py | 5 ++++- src/buildstream/_stream.py | 19 +++++++++++++++---- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py index a204bd9ac..578298df8 100644 --- a/src/buildstream/_frontend/status.py +++ b/src/buildstream/_frontend/status.py @@ -373,8 +373,8 @@ class _StatusHeader(): # # ========= 00:00:00 project-name (143/387) ========= # - session = str(len(self._stream.session_elements)) - total = str(len(self._stream.total_elements)) + session = self._stream.len_session_elements + total = self._stream.len_total_elements size = 0 text = '' diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py index 181ee7d2e..d40b87cd9 100644 --- a/src/buildstream/_frontend/widget.py +++ b/src/buildstream/_frontend/widget.py @@ -565,8 +565,8 @@ class LogLine(Widget): text += self.content_profile.fmt("Pipeline Summary\n", bold=True) values = OrderedDict() - values['Total'] = self.content_profile.fmt(str(len(stream.total_elements))) - values['Session'] = self.content_profile.fmt(str(len(stream.session_elements))) + values['Total'] = self.content_profile.fmt(stream.len_total_elements) + values['Session'] = self.content_profile.fmt(stream.len_session_elements) processed_maxlen = 1 skipped_maxlen = 1 diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 62c2754a6..bb3fac513 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -67,6 +67,7 @@ class NotificationType(FastEnum): EXCEPTION = "exception" START = "start" TASK_GROUPS = "task_groups" + ELEMENT_TOTALS = "element_totals" # Notification() @@ -90,7 +91,8 @@ class Notification(): message=None, task_error=None, exception=None, - task_groups=None): + task_groups=None, + element_totals=None): self.notification_type = notification_type self.full_name = full_name self.job_action = job_action @@ -101,6 +103,7 @@ class Notification(): self.task_error = task_error # Tuple of domain & reason self.exception = exception self.task_groups = task_groups + self.element_totals = element_totals # Scheduler() diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index c0bd110df..d01605eb1 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -72,6 +72,8 @@ class Stream(): self.session_elements = [] # List of elements being processed this session self.total_elements = [] # Total list of elements based on targets self.queues = [] # Queue objects + self.len_session_elements = None + self.len_total_elements = None # # Private members @@ -82,7 +84,6 @@ class Stream(): self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state - #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe() self._subprocess = None self._starttime = session_start # Synchronised with Scheduler's relative start time @@ -127,13 +128,13 @@ class Stream(): mp_context = mp.get_context(method='fork') process_name = "stream-{}".format(func.__name__) - + self._notify_front = mp.Queue() self._notify_back = mp.Queue() # Tell the scheduler to not use the notifier callback self._scheduler._notify_front = self._notify_front self._scheduler._notify_back = self._notify_back - + args = list(args) args.insert(0, self._notify_front) args.insert(0, func) @@ -1444,6 +1445,14 @@ class Stream(): else: self._session_start_callback() + # Also send through the session & total elements list lengths for status rendering + element_totals = str(len(self.session_elements)), str(len(self.total_elements)) + if self._notify_front: + self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, + element_totals=element_totals)) + else: + self.len_session_elements, self.len_total_elements = element_totals + status = self._scheduler.run(self.queues) if status == SchedStatus.ERROR: @@ -1768,6 +1777,8 @@ class Stream(): raise SubprocessException(**notification.exception) elif notification.notification_type == NotificationType.START: self._session_start_callback() + elif notification.notification_type == NotificationType.ELEMENT_TOTALS: + self.len_session_elements, self.len_total_elements = notification.element_totals else: raise StreamError("Unrecognised notification type received") @@ -1789,7 +1800,7 @@ class Stream(): except queue.Empty: notification = None break - + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and -- cgit v1.2.1