summaryrefslogtreecommitdiff
path: root/src/buildstream/_scheduler/queues/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildstream/_scheduler/queues/queue.py')
-rw-r--r--src/buildstream/_scheduler/queues/queue.py25
1 files changed, 15 insertions, 10 deletions
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 7c577e7bd..f2cefd5d2 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -91,20 +91,25 @@ class Queue():
# Abstract Methods for Queue implementations #
#####################################################
- # process()
+ # get_process_func()
#
- # Abstract method for processing an element
+ # Abstract method, returns a callable for processing an element.
#
- # Args:
- # element (Element): An element to process
+ # The callable should fit the signature `process(element: Element) -> any`.
#
- # Returns:
- # (any): An optional something to be returned
- # for every element successfully processed
+ # Note that the callable may be executed in a child process, so the return
+ # value should be a simple object (must be pickle-able, i.e. strings,
+ # lists, dicts, numbers, but not Element instances). This is sent to back
+ # to the main process.
#
+ # This method is the only way for a queue to affect elements, and so is
+ # not optional to implement.
#
- def process(self, element):
- pass
+ # Returns:
+ # (Callable[[Element], Any]): The callable for processing elements.
+ #
+ def get_process_func(self):
+ raise NotImplementedError()
# status()
#
@@ -218,7 +223,7 @@ class Queue():
ElementJob(self._scheduler, self.action_name,
self._element_log_path(element),
element=element, queue=self,
- action_cb=self.process,
+ action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries)
for element in ready