diff options
author | Jürg Billeter <j@bitron.ch> | 2019-05-24 08:32:59 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2019-05-24 08:32:59 +0200 |
commit | be086f83632b047bb2be0d6dd5f1ea38d156b46e (patch) | |
tree | 20d20e69b8cde58b473e036509273514e3ea3b3a | |
parent | ba3d82f923c3c9f0ea4bd93a04045d70aab48fac (diff) | |
download | buildstream-juerg/push_based_pipeline.tar.gz |
Use priority queue for ready queuejuerg/push_based_pipeline
-rw-r--r-- | src/buildstream/_scheduler/queues/buildqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/fetchqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/pullqueue.py | 2 | ||||
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 13 |
4 files changed, 12 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py index 6ee49527d..4567b88b6 100644 --- a/src/buildstream/_scheduler/queues/buildqueue.py +++ b/src/buildstream/_scheduler/queues/buildqueue.py @@ -121,4 +121,4 @@ class BuildQueue(Queue): if element._cached_success(): self._done_queue.append(element) else: - self._ready_queue.append(element) + self._push_ready(element) diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py index d6d253eb1..c37efbf02 100644 --- a/src/buildstream/_scheduler/queues/fetchqueue.py +++ b/src/buildstream/_scheduler/queues/fetchqueue.py @@ -89,4 +89,4 @@ class FetchQueue(Queue): if element._cached() or not element._should_fetch(self._fetch_original): self._done_queue.append(element) else: - self._ready_queue.append(element) + self._push_ready(element) diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py index eaa24785f..fc290cbd1 100644 --- a/src/buildstream/_scheduler/queues/pullqueue.py +++ b/src/buildstream/_scheduler/queues/pullqueue.py @@ -69,6 +69,6 @@ class PullQueue(Queue): def on_element_can_query_cache(self, element): if element._pull_pending(): - self._ready_queue.append(element) + self._push_ready(element) else: self._done_queue.append(element) diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 31c79b005..6d6237d32 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -22,6 +22,7 @@ import os from collections import deque from enum import Enum +import heapq import traceback # Local imports @@ -74,7 +75,7 @@ class Queue(): # self._scheduler = scheduler self._resources = scheduler.resources # Shared resource pool - self._ready_queue = deque() # Ready elements + self._ready_queue = [] # Ready elements self._done_queue = deque() # Processed / Skipped elements self._max_retries = 0 @@ -152,6 +153,9 @@ class Queue(): # Scheduler / Pipeline facing APIs # ##################################################### + def _push_ready(self, elt): + heapq.heappush(self._ready_queue, (elt._unique_id, elt)) + # enqueue() # # Enqueues some elements @@ -175,7 +179,8 @@ class Queue(): if status == QueueStatus.SKIP: skip.append(elt) elif status == QueueStatus.READY: - ready.append(elt) + # ready.append(elt) + self._push_ready(elt) else: wait.append(elt) @@ -183,7 +188,7 @@ class Queue(): # ready elements into the ready_queue. self.skipped_elements.extend(skip) # Public record of skipped elements self._done_queue.extend(skip) # Elements to proceed to the next queue - self._ready_queue.extend(ready) # Elements ready to perform the job + # self._ready_queue.extend(ready) # Elements ready to perform the job # Register callbacks for the waiting elements if wait: @@ -230,7 +235,7 @@ class Queue(): reserved = self._resources.reserve(self.resources) assert reserved - element = self._ready_queue.popleft() + _, element = heapq.heappop(self._ready_queue) ready.append(element) return [ |