summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/buildstream/_scheduler/queues/buildqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/fetchqueue.py12
-rw-r--r--src/buildstream/_scheduler/queues/pullqueue.py13
-rw-r--r--src/buildstream/_scheduler/queues/queue.py107
-rw-r--r--src/buildstream/_stream.py1
5 files changed, 90 insertions, 55 deletions
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index dc82f54ec..b280661cc 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -70,16 +70,11 @@ class BuildQueue(Queue):
return element._assemble()
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if element._cached_success():
return QueueStatus.SKIP
if not element._buildable():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
return QueueStatus.READY
@@ -115,3 +110,8 @@ class BuildQueue(Queue):
#
if status is JobStatus.OK:
self._check_cache_size(job, element, result)
+
+ def register_pending_element(self, element):
+ # Set a "buildable" callback for an element not yet ready
+ # to be processed in the build queue.
+ element._set_buildable_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 90db77f42..bbb3b3d78 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -45,15 +45,10 @@ class FetchQueue(Queue):
element._fetch(fetch_original=self._fetch_original)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
# Optionally skip elements that are already in the artifact cache
if self._skip_cached:
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._cached():
return QueueStatus.SKIP
@@ -78,3 +73,8 @@ class FetchQueue(Queue):
assert element._get_consistency() == Consistency.CACHED
else:
assert element._source_cached()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache" callback for an element not yet ready
+ # to be processed in the fetch queue.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 374181cda..245293342 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -39,13 +39,8 @@ class PullQueue(Queue):
raise SkipJob(self.action_name)
def status(self, element):
- if not element._is_required():
- # Artifact is not currently required but it may be requested later.
- # Keep it in the queue.
- return QueueStatus.WAIT
-
if not element._can_query_cache():
- return QueueStatus.WAIT
+ return QueueStatus.PENDING
if element._pull_pending():
return QueueStatus.READY
@@ -64,3 +59,9 @@ class PullQueue(Queue):
# actually check the cache size.
if status is JobStatus.OK:
self._scheduler.check_cache_size()
+
+ def register_pending_element(self, element):
+ # Set a "can_query_cache"_callback for an element which is not
+ # immediately ready to query the artifact cache so that it
+ # may be pulled.
+ element._set_can_query_cache_callback(self._enqueue_element)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7740896b5..951b28c2a 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -29,7 +29,7 @@ from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
-from ..._exceptions import BstError, set_last_task_error
+from ..._exceptions import BstError, ImplError, set_last_task_error
from ..._message import Message, MessageType
@@ -37,8 +37,8 @@ from ..._message import Message, MessageType
#
#
class QueueStatus(Enum):
- # The element is waiting for dependencies.
- WAIT = 1
+ # The element is not yet ready to be processed in the queue.
+ PENDING = 1
# The element can skip this queue.
SKIP = 2
@@ -73,10 +73,12 @@ class Queue():
#
self._scheduler = scheduler
self._resources = scheduler.resources # Shared resource pool
- self._wait_queue = deque() # Ready / Waiting elements
+ self._ready_queue = deque() # Ready elements
self._done_queue = deque() # Processed / Skipped elements
self._max_retries = 0
+ self._required_element_check = False # Whether we should check that elements are required before enqueuing
+
# Assert the subclass has setup class data
assert self.action_name is not None
assert self.complete_name is not None
@@ -105,7 +107,9 @@ class Queue():
# status()
#
- # Abstract method for reporting the status of an element.
+ # Abstract method for reporting the immediate status of an element. The status
+ # determines whether an element can/cannot be pushed into the queue, or even
+ # skip the queue entirely, when called.
#
# Args:
# element (Element): An element to process
@@ -130,6 +134,23 @@ class Queue():
pass
#####################################################
+ # Virtual Methods for Queue implementations #
+ #####################################################
+
+ # register_pending_element()
+ #
+ # Virtual method for registering a queue specific callback
+ # to an Element which is not immediately ready to advance
+ # to the next queue
+ #
+ # Args:
+ # element (Element): The element waiting to be pushed into the queue
+ #
+ def register_pending_element(self, element):
+ raise ImplError("Queue type: {} does not implement register_pending_element()"
+ .format(self.action_name))
+
+ #####################################################
# Scheduler / Pipeline facing APIs #
#####################################################
@@ -144,18 +165,12 @@ class Queue():
if not elts:
return
- # Place skipped elements on the done queue right away.
- #
- # The remaining ready and waiting elements must remain in the
- # same queue, and ready status must be determined at the moment
- # which the scheduler is asking for the next job.
- #
- skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
- wait = [elt for elt in elts if elt not in skip]
-
- self.skipped_elements.extend(skip) # Public record of skipped elements
- self._done_queue.extend(skip) # Elements to be processed
- self._wait_queue.extend(wait) # Elements eligible to be dequeued
+ # Obtain immediate element status
+ for elt in elts:
+ if self._required_element_check and not elt._is_required():
+ elt._set_required_callback(self._enqueue_element)
+ else:
+ self._enqueue_element(elt)
# dequeue()
#
@@ -181,35 +196,22 @@ class Queue():
# harvest_jobs()
#
- # Process elements in the queue, moving elements which were enqueued
- # into the dequeue pool, and creating as many jobs for which resources
+ # Spawn as many jobs from the ready queue for which resources
# can be reserved.
#
# Returns:
# ([Job]): A list of jobs which can be run now
#
def harvest_jobs(self):
- unready = []
ready = []
-
- while self._wait_queue:
- if not self._resources.reserve(self.resources, peek=True):
+ while self._ready_queue:
+ # Now reserve them
+ reserved = self._resources.reserve(self.resources)
+ if not reserved:
break
- element = self._wait_queue.popleft()
- status = self.status(element)
-
- if status == QueueStatus.WAIT:
- unready.append(element)
- elif status == QueueStatus.SKIP:
- self._done_queue.append(element)
- self.skipped_elements.append(element)
- else:
- reserved = self._resources.reserve(self.resources)
- assert reserved
- ready.append(element)
-
- self._wait_queue.extendleft(unready)
+ element = self._ready_queue.popleft()
+ ready.append(element)
return [
ElementJob(self._scheduler, self.action_name,
@@ -221,6 +223,13 @@ class Queue():
for element in ready
]
+ # set_required_element_check()
+ #
+ # This ensures that, for the first non-track queue, we must check
+ # whether elements are required before enqueuing them
+ def set_required_element_check(self):
+ self._required_element_check = True
+
#####################################################
# Private Methods #
#####################################################
@@ -326,3 +335,27 @@ class Queue():
logfile = "{key}-{action}".format(key=key, action=action)
return os.path.join(project.name, element.normal_name, logfile)
+
+ # _enqueue_element()
+ #
+ # Enqueue an Element upon a callback to a specific queue
+ # Here we check whether an element is either immediately ready to be processed
+ # in the current queue or whether it can skip the queue. Element's which are
+ # not yet ready to be processed or cannot skip will have the appropriate
+ # callback registered
+ #
+ # Args:
+ # element (Element): The Element to enqueue
+ #
+ def _enqueue_element(self, element):
+ status = self.status(element)
+ if status == QueueStatus.SKIP:
+ # Place skipped elements into the done queue immediately
+ self.skipped_elements.append(element) # Public record of skipped elements
+ self._done_queue.append(element) # Elements to proceed to the next queue
+ elif status == QueueStatus.READY:
+ # Push elements which are ready to be processed immediately into the queue
+ self._ready_queue.append(element)
+ else:
+ # Register a queue specific callback for pending elements
+ self.register_pending_element(element)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2343c553c..537671679 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1180,6 +1180,7 @@ class Stream():
if not (track or self._first_non_track_queue):
self._first_non_track_queue = queue
+ self._first_non_track_queue.set_required_element_check()
# _enqueue_plan()
#