summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Pollard <tom.pollard@codethink.co.uk>2019-09-27 14:51:53 +0100
committerTom Pollard <tom.pollard@codethink.co.uk>2019-10-02 15:40:19 +0100
commit5cdf740edea883d302774d6c136d107df6dc4b14 (patch)
tree1ec42ab4adc49e6f5e23b9c8f67cf4963a743ce7
parent1142484eca2df3ec966e5f6147554cb2a5346b1e (diff)
downloadbuildstream-5cdf740edea883d302774d6c136d107df6dc4b14.tar.gz
Add len of session/total elements members to Stream
-rw-r--r--src/buildstream/_frontend/status.py4
-rw-r--r--src/buildstream/_frontend/widget.py4
-rw-r--r--src/buildstream/_scheduler/scheduler.py5
-rw-r--r--src/buildstream/_stream.py19
4 files changed, 23 insertions, 9 deletions
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index a204bd9ac..578298df8 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,8 +373,8 @@ class _StatusHeader():
#
# ========= 00:00:00 project-name (143/387) =========
#
- session = str(len(self._stream.session_elements))
- total = str(len(self._stream.total_elements))
+ session = self._stream.len_session_elements
+ total = self._stream.len_total_elements
size = 0
text = ''
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 181ee7d2e..d40b87cd9 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -565,8 +565,8 @@ class LogLine(Widget):
text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
values = OrderedDict()
- values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
- values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
+ values['Total'] = self.content_profile.fmt(stream.len_total_elements)
+ values['Session'] = self.content_profile.fmt(stream.len_session_elements)
processed_maxlen = 1
skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 62c2754a6..bb3fac513 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -67,6 +67,7 @@ class NotificationType(FastEnum):
EXCEPTION = "exception"
START = "start"
TASK_GROUPS = "task_groups"
+ ELEMENT_TOTALS = "element_totals"
# Notification()
@@ -90,7 +91,8 @@ class Notification():
message=None,
task_error=None,
exception=None,
- task_groups=None):
+ task_groups=None,
+ element_totals=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -101,6 +103,7 @@ class Notification():
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
self.task_groups = task_groups
+ self.element_totals = element_totals
# Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c0bd110df..d01605eb1 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -72,6 +72,8 @@ class Stream():
self.session_elements = [] # List of elements being processed this session
self.total_elements = [] # Total list of elements based on targets
self.queues = [] # Queue objects
+ self.len_session_elements = None
+ self.len_total_elements = None
#
# Private members
@@ -82,7 +84,6 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
- #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
self._subprocess = None
self._starttime = session_start # Synchronised with Scheduler's relative start time
@@ -127,13 +128,13 @@ class Stream():
mp_context = mp.get_context(method='fork')
process_name = "stream-{}".format(func.__name__)
-
+
self._notify_front = mp.Queue()
self._notify_back = mp.Queue()
# Tell the scheduler to not use the notifier callback
self._scheduler._notify_front = self._notify_front
self._scheduler._notify_back = self._notify_back
-
+
args = list(args)
args.insert(0, self._notify_front)
args.insert(0, func)
@@ -1444,6 +1445,14 @@ class Stream():
else:
self._session_start_callback()
+ # Also send through the session & total elements list lengths for status rendering
+ element_totals = str(len(self.session_elements)), str(len(self.total_elements))
+ if self._notify_front:
+ self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
+ element_totals=element_totals))
+ else:
+ self.len_session_elements, self.len_total_elements = element_totals
+
status = self._scheduler.run(self.queues)
if status == SchedStatus.ERROR:
@@ -1768,6 +1777,8 @@ class Stream():
raise SubprocessException(**notification.exception)
elif notification.notification_type == NotificationType.START:
self._session_start_callback()
+ elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+ self.len_session_elements, self.len_total_elements = notification.element_totals
else:
raise StreamError("Unrecognised notification type received")
@@ -1789,7 +1800,7 @@ class Stream():
except queue.Empty:
notification = None
break
-
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and