diff options
Diffstat (limited to 'src/buildstream/_scheduler/queues/queue.py')
-rw-r--r-- | src/buildstream/_scheduler/queues/queue.py | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 7c577e7bd..f2cefd5d2 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -91,20 +91,25 @@ class Queue(): # Abstract Methods for Queue implementations # ##################################################### - # process() + # get_process_func() # - # Abstract method for processing an element + # Abstract method, returns a callable for processing an element. # - # Args: - # element (Element): An element to process + # The callable should fit the signature `process(element: Element) -> any`. # - # Returns: - # (any): An optional something to be returned - # for every element successfully processed + # Note that the callable may be executed in a child process, so the return + # value should be a simple object (must be pickle-able, i.e. strings, + # lists, dicts, numbers, but not Element instances). This is sent to back + # to the main process. # + # This method is the only way for a queue to affect elements, and so is + # not optional to implement. # - def process(self, element): - pass + # Returns: + # (Callable[[Element], Any]): The callable for processing elements. + # + def get_process_func(self): + raise NotImplementedError() # status() # @@ -218,7 +223,7 @@ class Queue(): ElementJob(self._scheduler, self.action_name, self._element_log_path(element), element=element, queue=self, - action_cb=self.process, + action_cb=self.get_process_func(), complete_cb=self._job_done, max_retries=self._max_retries) for element in ready |