summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJames Ennis <james.ennis@codethink.co.uk>2019-05-15 10:10:18 +0100
committerJürg Billeter <j@bitron.ch>2019-05-23 16:36:03 +0200
commit8781540e7d387da5079d7ff055a3b52b3bd83c08 (patch)
treef5c15d91a52464690a1fe6051843e0a907983c1e /src
parente1ae5eadff395825cb52ae929613f4e422040c89 (diff)
downloadbuildstream-8781540e7d387da5079d7ff055a3b52b3bd83c08.tar.gz
queue.py: Provide virtual method to register callbacks for unready elements
When using push-based queues, elements which are not immediately ready to be processed in their queue need to have an appropriate callback registered. This will inform the queue when the element is ready to be processed.
Diffstat (limited to 'src')
-rw-r--r--src/buildstream/_scheduler/queues/queue.py24
1 files changed, 22 insertions, 2 deletions
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 8e2fd981a..459e3040d 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -29,7 +29,7 @@ from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
-from ..._exceptions import BstError, set_last_task_error
+from ..._exceptions import BstError, ImplError, set_last_task_error
from ..._message import Message, MessageType
@@ -130,6 +130,24 @@ class Queue():
pass
#####################################################
+ # Virtual Methods for Queue implementations #
+ #####################################################
+
+ # register_waiting_elements()
+ #
+ # Virtual method for registering queue specific callbacks
+ # to Elements which are not immediately ready to advance
+ # to the next queue
+ #
+ # Args:
+ # elements (list): A list of elements waiting to be
+ # pushed into the queue
+ #
+ def register_waiting_elements(self, elements):
+ raise ImplError("Queue type: {} does not implement register_waiting_elements()"
+ .format(self.action_name))
+
+ #####################################################
# Scheduler / Pipeline facing APIs #
#####################################################
@@ -163,7 +181,9 @@ class Queue():
self._done_queue.extend(skip) # Elements to proceed to the next queue
self._ready_queue.extend(ready) # Elements ready to perform the job
- # TODO: Register callbacks for the elements which are not yet ready
+ # Register callbacks for the waiting elements
+ if wait:
+ self.register_waiting_elements(wait)
# dequeue()
#