summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJames Ennis <james.ennis@codethink.co.uk>2019-05-14 11:49:58 +0100
committerJames Ennis <james.ennis@codethink.co.uk>2019-06-07 14:47:16 +0100
commit14514683bad92f9947540784572173036310ce29 (patch)
tree4d148c74710806eb3bbf6e1409787a9352d38b0a /src
parent3427cea53ed9f4c6517606fd13abf2d06352f8d2 (diff)
downloadbuildstream-14514683bad92f9947540784572173036310ce29.tar.gz
queue.py: Push-based queues
* Queue.enqueue() and Queue.harvest_jobs() now exhibit push-based behaviour. Although most of the logic from Queue.enqueue() has been moved to Queue._enqueue_element() * QueueStatus.WAIT has been replaced with QueueStatus.PENDING to reflect the new push-based nature of the queues * There now exists a virtual method in Queue: register_pending_element which is used to register am element which is not immediately ready to be processed in the queue with specific callbacks. These callbacks will enqueue the element when called.
Diffstat (limited to 'src')
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py13
-rw-r--r--src/buildstream/_scheduler/queues/queue.py107
-rw-r--r--src/buildstream/_stream.py1
5 files changed, 90 insertions, 55 deletions
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index dc82f54ec..b280661cc 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -70,16 +70,11 @@ class BuildQueue(Queue):
return element._assemble()
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if element._cached_success():
return QueueStatus.SKIP
if not element._buildable():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
return QueueStatus.READY
@@ -115,3 +110,8 @@ class BuildQueue(Queue):
#
if status is JobStatus.OK:
self._check_cache_size(job, element, result)
+
+ def register_pending_element(self, element):
+ # Set a "buildable" callback for an element not yet ready
+ # to be processed in the build queue.
+ element._set_buildable_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 90db77f42..bbb3b3d78 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -45,15 +45,10 @@ class FetchQueue(Queue):
element._fetch(fetch_original=self._fetch_original)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
# Optionally skip elements that are already in the artifact cache
if self._skip_cached:
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._cached():
return QueueStatus.SKIP
@@ -78,3 +73,8 @@ class FetchQueue(Queue):
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache" callback for an element not yet ready
+ # to be processed in the fetch queue.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 374181cda..245293342 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -39,13 +39,8 @@ class PullQueue(Queue):
raise SkipJob(self.action_name)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._pull_pending():
return QueueStatus.READY
@@ -64,3 +59,9 @@ class PullQueue(Queue):
# actually check the cache size.
if status is JobStatus.OK:
self._scheduler.check_cache_size()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache"_callback for an element which is not
+ # immediately ready to query the artifact cache so that it
+ # may be pulled.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7740896b5..951b28c2a 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -29,7 +29,7 @@ from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
-from ..._exceptions import BstError, set_last_task_error
+from ..._exceptions import BstError, ImplError, set_last_task_error
from ..._message import Message, MessageType
@@ -37,8 +37,8 @@ from ..._message import Message, MessageType
#
#
class QueueStatus(Enum):
- # The element is waiting for dependencies.
- WAIT = 1
+ # The element is not yet ready to be processed in the queue.
+ PENDING = 1
# The element can skip this queue.
SKIP = 2
@@ -73,10 +73,12 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._wait_queue = deque() # Ready / Waiting elements
+ self._ready_queue = deque() # Ready elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
+ self._required_element_check = False # Whether we should check that elements are required before enqueuing
+
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
@@ -105,7 +107,9 @@ class Queue():
# status()
#
- # Abstract method for reporting the status of an element.
+ # Abstract method for reporting the immediate status of an element. The status
+ # determines whether an element can/cannot be pushed into the queue, or even
+ # skip the queue entirely, when called.
#
# Args:
# element (Element): An element to process
@@ -130,6 +134,23 @@ class Queue():
pass
#####################################################
+ # Virtual Methods for Queue implementations #
+ #####################################################
+
+ # register_pending_element()
+ #
+ # Virtual method for registering a queue specific callback
+ # to an Element which is not immediately ready to advance
+ # to the next queue
+ #
+ # Args:
+ # element (Element): The element waiting to be pushed into the queue
+ #
+ def register_pending_element(self, element):
+ raise ImplError("Queue type: {} does not implement register_pending_element()"
+ .format(self.action_name))
+
+ #####################################################
# Scheduler / Pipeline facing APIs #
#####################################################
@@ -144,18 +165,12 @@ class Queue():
if not elts:
return
- # Place skipped elements on the done queue right away.
- #
- # The remaining ready and waiting elements must remain in the
- # same queue, and ready status must be determined at the moment
- # which the scheduler is asking for the next job.
- #
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
-
- self.skipped_elements.extend(skip) # Public record of skipped elements
- self._done_queue.extend(skip) # Elements to be processed
- self._wait_queue.extend(wait) # Elements eligible to be dequeued
+ # Obtain immediate element status
+ for elt in elts:
+ if self._required_element_check and not elt._is_required():
+ elt._set_required_callback(self._enqueue_element)
+ else:
+ self._enqueue_element(elt)
# dequeue()
#
@@ -181,35 +196,22 @@ class Queue():
# harvest_jobs()
#
- # Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and creating as many jobs for which resources
+ # Spawn as many jobs from the ready queue for which resources
# can be reserved.
#
# Returns:
# ([Job]): A list of jobs which can be run now
#
def harvest_jobs(self):
- unready = []
ready = []
-
- while self._wait_queue:
- if not self._resources.reserve(self.resources, peek=True):
+ while self._ready_queue:
+ # Now reserve them
+ reserved = self._resources.reserve(self.resources)
+ if not reserved:
break
- element = self._wait_queue.popleft()
- status = self.status(element)
-
- if status == QueueStatus.WAIT:
- unready.append(element)
- elif status == QueueStatus.SKIP:
- self._done_queue.append(element)
- self.skipped_elements.append(element)
- else:
- reserved = self._resources.reserve(self.resources)
- assert reserved
- ready.append(element)
-
- self._wait_queue.extendleft(unready)
+ element = self._ready_queue.popleft()
+ ready.append(element)
return [
ElementJob(self._scheduler, self.action_name,
@@ -221,6 +223,13 @@ class Queue():
for element in ready
]
+ # set_required_element_check()
+ #
+ # This ensures that, for the first non-track queue, we must check
+ # whether elements are required before enqueuing them
+ def set_required_element_check(self):
+ self._required_element_check = True
+
#####################################################
# Private Methods #
#####################################################
@@ -326,3 +335,27 @@ class Queue():
logfile = "{key}-{action}".format(key=key, action=action)
return os.path.join(project.name, element.normal_name, logfile)
+
+ # _enqueue_element()
+ #
+ # Enqueue an Element upon a callback to a specific queue
+ # Here we check whether an element is either immediately ready to be processed
+ # in the current queue or whether it can skip the queue. Element's which are
+ # not yet ready to be processed or cannot skip will have the appropriate
+ # callback registered
+ #
+ # Args:
+ # element (Element): The Element to enqueue
+ #
+ def _enqueue_element(self, element):
+ status = self.status(element)
+ if status == QueueStatus.SKIP:
+ # Place skipped elements into the done queue immediately
+ self.skipped_elements.append(element) # Public record of skipped elements
+ self._done_queue.append(element) # Elements to proceed to the next queue
+ elif status == QueueStatus.READY:
+ # Push elements which are ready to be processed immediately into the queue
+ self._ready_queue.append(element)
+ else:
+ # Register a queue specific callback for pending elements
+ self.register_pending_element(element)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2343c553c..537671679 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1180,6 +1180,7 @@ class Stream():
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
+ self._first_non_track_queue.set_required_element_check()
# _enqueue_plan()
#