summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Ennis <james.ennis@codethink.co.uk>2019-05-28 17:31:07 +0100
committerJames Ennis <james.ennis@codethink.co.uk>2019-06-07 14:47:16 +0100
commit2908ca8f2b417f7b0824a934770619d0f8cf8909 (patch)
treeb11b6021fd5dca76b525918c7cf8c1c1d3f8da51
parent14514683bad92f9947540784572173036310ce29 (diff)
downloadbuildstream-jennis/push_based_pipeline.tar.gz
queue.py: Use heapq for the ready queuejennis/push_based_pipeline
This patch includes setting a _depth to each element once the pipeline has been sorted. This is necessary as we need to store elements in the heapq sorted by their depth.
-rw-r--r--src/buildstream/_pipeline.py5
-rw-r--r--src/buildstream/_scheduler/queues/queue.py7
-rw-r--r--src/buildstream/element.py12
3 files changed, 21 insertions, 3 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index c176b82f6..d44813348 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -513,4 +513,9 @@ class _Planner():
self.plan_element(root, 0)
depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True)
+
+ # Set the depth of each element
+ for index, item in enumerate(depth_sorted):
+ item[0]._set_depth(index)
+
return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 951b28c2a..9a07f633c 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -22,6 +22,7 @@
import os
from collections import deque
from enum import Enum
+import heapq
import traceback
# Local imports
@@ -73,7 +74,7 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._ready_queue = deque() # Ready elements
+ self._ready_queue = [] # Ready elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
@@ -210,7 +211,7 @@ class Queue():
if not reserved:
break
- element = self._ready_queue.popleft()
+ _, element = heapq.heappop(self._ready_queue)
ready.append(element)
return [
@@ -355,7 +356,7 @@ class Queue():
self._done_queue.append(element) # Elements to proceed to the next queue
elif status == QueueStatus.READY:
# Push elements which are ready to be processed immediately into the queue
- self._ready_queue.append(element)
+ heapq.heappush(self._ready_queue, (element._depth, element))
else:
# Register a queue specific callback for pending elements
self.register_pending_element(element)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 355773f76..84c8f20ff 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -244,6 +244,8 @@ class Element(Plugin):
self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue
self.__buildable_callback = None # Callback to BuildQueue
+ self._depth = None # Depth of Element in its current dependency graph
+
# Ensure we have loaded this class's defaults
self.__init_defaults(project, plugin_conf, meta.kind, meta.is_junction)
@@ -2310,6 +2312,16 @@ class Element(Plugin):
def _set_buildable_callback(self, callback):
self.__buildable_callback = callback
+ # _set_depth()
+ #
+ # Set the depth of the Element.
+ #
+ # The depth represents the position of the Element within the current
+ # session's dependency graph. A depth of zero represents the bottommost element.
+ #
+ def _set_depth(self, depth):
+ self._depth = depth
+
#############################################################
# Private Local Methods #
#############################################################