diff options
Diffstat (limited to 'taskflow/engines/worker_based/executor.py')
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index dfbe5b3..70c448b 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -41,7 +41,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): def __init__(self, uuid, exchange, topics, transition_timeout=pr.REQUEST_TIMEOUT, url=None, transport=None, transport_options=None, - retry_options=None): + retry_options=None, worker_expiry=pr.EXPIRES_AFTER): self._uuid = uuid self._ongoing_requests = {} self._ongoing_requests_lock = threading.RLock() @@ -57,7 +57,8 @@ class WorkerTaskExecutor(executor.TaskExecutor): # to workers to 'learn' of the tasks they can perform (and requires # pre-existing knowledge of the topics those workers are on to gather # and update this information). - self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) + self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics, + worker_expiry=worker_expiry) self._proxy.dispatcher.type_handlers.update({ pr.RESPONSE: dispatcher.Handler(self._process_response, validator=pr.Response.validate), @@ -181,6 +182,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): """This function is called cyclically between draining events.""" # Publish any finding messages (used to locate workers). self._finder.maybe_publish() + # If the finder hasn't heard from workers in a given amount + # of time, then those workers are likely dead, so clean them out... + self._finder.clean() # Process any expired requests or requests that have no current # worker located (publish messages for those if we now do have # a worker located). |