summaryrefslogtreecommitdiff
path: root/taskflow/engines/worker_based/executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow/engines/worker_based/executor.py')
-rw-r--r--taskflow/engines/worker_based/executor.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index dfbe5b3..70c448b 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -41,7 +41,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
def __init__(self, uuid, exchange, topics,
transition_timeout=pr.REQUEST_TIMEOUT,
url=None, transport=None, transport_options=None,
- retry_options=None):
+ retry_options=None, worker_expiry=pr.EXPIRES_AFTER):
self._uuid = uuid
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
@@ -57,7 +57,8 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# to workers to 'learn' of the tasks they can perform (and requires
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
- self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
+ self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics,
+ worker_expiry=worker_expiry)
self._proxy.dispatcher.type_handlers.update({
pr.RESPONSE: dispatcher.Handler(self._process_response,
validator=pr.Response.validate),
@@ -181,6 +182,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
"""This function is called cyclically between draining events."""
# Publish any finding messages (used to locate workers).
self._finder.maybe_publish()
+ # If the finder hasn't heard from workers in a given amount
+ # of time, then those workers are likely dead, so clean them out...
+ self._finder.clean()
# Process any expired requests or requests that have no current
# worker located (publish messages for those if we now do have
# a worker located).