diff options
author | James Ennis <james.ennis@codethink.co.uk> | 2019-05-28 17:31:07 +0100 |
---|---|---|
committer | James Ennis <james.ennis@codethink.co.uk> | 2019-06-07 14:47:16 +0100 |
commit | 2908ca8f2b417f7b0824a934770619d0f8cf8909 (patch) | |
tree | b11b6021fd5dca76b525918c7cf8c1c1d3f8da51 /src | |
parent | 14514683bad92f9947540784572173036310ce29 (diff) | |
download | buildstream-2908ca8f2b417f7b0824a934770619d0f8cf8909.tar.gz |
queue.py: Use heapq for the ready queuejennis/push_based_pipeline
This patch includes setting a _depth to each element
once the pipeline has been sorted. This is necessary
as we need to store elements in the heapq sorted by
their depth.
Diffstat (limited to 'src')
-rw-r--r-- | src/buildstream/_pipeline.py | 5 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 7 | ||||
-rw-r--r-- | src/buildstream/element.py | 12 |
3 files changed, 21 insertions, 3 deletions
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py index c176b82f6..d44813348 100644 --- a/src/buildstream/_pipeline.py +++ b/src/buildstream/_pipeline.py @@ -513,4 +513,9 @@ class _Planner(): self.plan_element(root, 0) depth_sorted = sorted(self.depth_map.items(), key=itemgetter(1), reverse=True) + + # Set the depth of each element + for index, item in enumerate(depth_sorted): + item[0]._set_depth(index) + return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()] diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 951b28c2a..9a07f633c 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -22,6 +22,7 @@ import os from collections import deque from enum import Enum +import heapq import traceback # Local imports @@ -73,7 +74,7 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._ready_queue = deque() # Ready elements + self._ready_queue = [] # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 @@ -210,7 +211,7 @@ class Queue(): if not reserved: break - element = self._ready_queue.popleft() + _, element = heapq.heappop(self._ready_queue) ready.append(element) return [ @@ -355,7 +356,7 @@ class Queue(): self._done_queue.append(element) # Elements to proceed to the next queue elif status == QueueStatus.READY: # Push elements which are ready to be processed immediately into the queue - self._ready_queue.append(element) + heapq.heappush(self._ready_queue, (element._depth, element)) else: # Register a queue specific callback for pending elements self.register_pending_element(element) diff --git a/src/buildstream/element.py b/src/buildstream/element.py index 355773f76..84c8f20ff 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -244,6 +244,8 @@ class Element(Plugin): self.__can_query_cache_callback = None # Callback to PullQueue/FetchQueue self.__buildable_callback = None # Callback to BuildQueue + self._depth = None # Depth of Element in its current dependency graph + # Ensure we have loaded this class's defaults self.__init_defaults(project, plugin_conf, meta.kind, meta.is_junction) @@ -2310,6 +2312,16 @@ class Element(Plugin): def _set_buildable_callback(self, callback): self.__buildable_callback = callback + # _set_depth() + # + # Set the depth of the Element. + # + # The depth represents the position of the Element within the current + # session's dependency graph. A depth of zero represents the bottommost element. + # + def _set_depth(self, depth): + self._depth = depth + ############################################################# # Private Local Methods # ############################################################# |