summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJürg Billeter <j@bitron.ch>2019-05-24 08:32:59 +0200
committerJürg Billeter <j@bitron.ch>2019-05-24 08:32:59 +0200
commitbe086f83632b047bb2be0d6dd5f1ea38d156b46e (patch)
tree20d20e69b8cde58b473e036509273514e3ea3b3a
parentba3d82f923c3c9f0ea4bd93a04045d70aab48fac (diff)
downloadbuildstream-juerg/push_based_pipeline.tar.gz
Use priority queue for ready queuejuerg/push_based_pipeline
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py2
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py2
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py2
-rw-r--r--src/buildstream/_scheduler/queues/queue.py13
4 files changed, 12 insertions, 7 deletions
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 6ee49527d..4567b88b6 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -121,4 +121,4 @@ class BuildQueue(Queue):
if element._cached_success():
self._done_queue.append(element)
else:
- self._ready_queue.append(element)
+ self._push_ready(element)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index d6d253eb1..c37efbf02 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -89,4 +89,4 @@ class FetchQueue(Queue):
if element._cached() or not element._should_fetch(self._fetch_original):
self._done_queue.append(element)
else:
- self._ready_queue.append(element)
+ self._push_ready(element)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index eaa24785f..fc290cbd1 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -69,6 +69,6 @@ class PullQueue(Queue):
def on_element_can_query_cache(self, element):
if element._pull_pending():
- self._ready_queue.append(element)
+ self._push_ready(element)
else:
self._done_queue.append(element)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 31c79b005..6d6237d32 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -22,6 +22,7 @@
import os
from collections import deque
from enum import Enum
+import heapq
import traceback
# Local imports
@@ -74,7 +75,7 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._ready_queue = deque() # Ready elements
+ self._ready_queue = [] # Ready elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
@@ -152,6 +153,9 @@ class Queue():
# Scheduler / Pipeline facing APIs #
#####################################################
+ def _push_ready(self, elt):
+ heapq.heappush(self._ready_queue, (elt._unique_id, elt))
+
# enqueue()
#
# Enqueues some elements
@@ -175,7 +179,8 @@ class Queue():
if status == QueueStatus.SKIP:
skip.append(elt)
elif status == QueueStatus.READY:
- ready.append(elt)
+ # ready.append(elt)
+ self._push_ready(elt)
else:
wait.append(elt)
@@ -183,7 +188,7 @@ class Queue():
# ready elements into the ready_queue.
self.skipped_elements.extend(skip) # Public record of skipped elements
self._done_queue.extend(skip) # Elements to proceed to the next queue
- self._ready_queue.extend(ready) # Elements ready to perform the job
+ # self._ready_queue.extend(ready) # Elements ready to perform the job
# Register callbacks for the waiting elements
if wait:
@@ -230,7 +235,7 @@ class Queue():
reserved = self._resources.reserve(self.resources)
assert reserved
- element = self._ready_queue.popleft()
+ _, element = heapq.heappop(self._ready_queue)
ready.append(element)
return [