summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-15 03:06:10 +0000
committerGerrit Code Review <review@openstack.org>2016-02-15 03:06:10 +0000
commit6241a5c5920e5997fe1cc392413cc7ce4536c33c (patch)
treed8a46857e22b9062e92cb11e20d28c71922aca73
parent7ea2bfc2974c4854e8d718383af961acb8daffeb (diff)
parenta70bd8a7e59f52bc20dd4e219c4242b0f15664b4 (diff)
downloadtaskflow-6241a5c5920e5997fe1cc392413cc7ce4536c33c.tar.gz
Merge "Remove need for separate notify thread"
-rw-r--r--taskflow/engines/worker_based/executor.py59
-rw-r--r--taskflow/engines/worker_based/types.py54
-rw-r--r--taskflow/tests/unit/worker_based/test_executor.py5
3 files changed, 69 insertions, 49 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index d274953..46c3fdf 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -17,7 +17,6 @@
import functools
import threading
-from futurist import periodics
from oslo_utils import timeutils
import six
@@ -47,12 +46,7 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._ongoing_requests = {}
self._ongoing_requests_lock = threading.RLock()
self._transition_timeout = transition_timeout
- type_handlers = {
- pr.RESPONSE: dispatcher.Handler(self._process_response,
- validator=pr.Response.validate),
- }
self._proxy = proxy.Proxy(uuid, exchange,
- type_handlers=type_handlers,
on_wait=self._on_wait, url=url,
transport=transport,
transport_options=transport_options,
@@ -64,16 +58,17 @@ class WorkerTaskExecutor(executor.TaskExecutor):
# pre-existing knowledge of the topics those workers are on to gather
# and update this information).
self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics)
- self._helpers = tu.ThreadBundle()
- self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start),
- after_start=lambda t: self._proxy.wait(),
- before_join=lambda t: self._proxy.stop())
- p_worker = periodics.PeriodicWorker.create([self._finder])
- if p_worker:
- self._helpers.bind(lambda: tu.daemon_thread(p_worker.start),
- before_join=lambda t: p_worker.stop(),
- after_join=lambda t: p_worker.reset(),
- before_start=lambda t: p_worker.reset())
+ self._proxy.dispatcher.type_handlers.update({
+ pr.RESPONSE: dispatcher.Handler(self._process_response,
+ validator=pr.Response.validate),
+ pr.NOTIFY: dispatcher.Handler(
+ self._finder.process_response,
+ validator=functools.partial(pr.Notify.validate,
+ response=True)),
+ })
+ # Thread that will run the message dispatching (and periodically
+ # call the on_wait callback to do various things) loop...
+ self._helper = None
self._messages_processed = {
'finder': self._finder.messages_processed,
}
@@ -138,8 +133,9 @@ class WorkerTaskExecutor(executor.TaskExecutor):
return True
return False
- def _on_wait(self):
- """This function is called cyclically between draining events."""
+ def _clean(self):
+ if not self._ongoing_requests:
+ return
with self._ongoing_requests_lock:
ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests))
waiting_requests = {}
@@ -178,6 +174,15 @@ class WorkerTaskExecutor(executor.TaskExecutor):
self._publish_request(request, worker)
self._messages_processed['finder'] = new_messages_processed
+ def _on_wait(self):
+ """This function is called cyclically between draining events."""
+ # Publish any finding messages (used to locate workers).
+ self._finder.maybe_publish()
+ # Process any expired requests or requests that have no current
+ # worker located (publish messages for those if we now do have
+ # a worker located).
+ self._clean()
+
def _submit_task(self, task, task_uuid, action, arguments,
progress_callback=None, **kwargs):
"""Submit task request to a worker."""
@@ -249,15 +254,23 @@ class WorkerTaskExecutor(executor.TaskExecutor):
timeout=timeout)
def start(self):
- """Starts proxy thread and associated topic notification thread."""
- self._helpers.start()
+ """Starts message processing thread."""
+ if self._helper is not None:
+ raise RuntimeError("Worker executor must be stopped before"
+ " it can be started")
+ self._helper = tu.daemon_thread(self._proxy.start)
+ self._helper.start()
+ self._proxy.wait()
def stop(self):
- """Stops proxy thread and associated topic notification thread."""
- self._helpers.stop()
+ """Stops message processing thread."""
+ if self._helper is not None:
+ self._proxy.stop()
+ self._helper.join()
+ self._helper = None
with self._ongoing_requests_lock:
while self._ongoing_requests:
_request_uuid, request = self._ongoing_requests.popitem()
self._handle_expired_request(request)
- self._finder.clear()
+ self._finder.reset()
self._messages_processed['finder'] = self._finder.messages_processed
diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py
index c8f11e2..6bc215e 100644
--- a/taskflow/engines/worker_based/types.py
+++ b/taskflow/engines/worker_based/types.py
@@ -15,17 +15,13 @@
# under the License.
import abc
-import functools
-import itertools
import random
import threading
-from futurist import periodics
from oslo_utils import reflection
from oslo_utils import timeutils
import six
-from taskflow.engines.worker_based import dispatcher
from taskflow.engines.worker_based import protocol as pr
from taskflow import logging
from taskflow.utils import kombu_utils as ku
@@ -132,27 +128,21 @@ class WorkerFinder(object):
def get_worker_for_task(self, task):
"""Gets a worker that can perform a given task."""
- def clear(self):
- pass
-
class ProxyWorkerFinder(WorkerFinder):
"""Requests and receives responses about workers topic+task details."""
- def __init__(self, uuid, proxy, topics):
+ def __init__(self, uuid, proxy, topics,
+ beat_periodicity=pr.NOTIFY_PERIOD):
super(ProxyWorkerFinder, self).__init__()
self._proxy = proxy
self._topics = topics
self._workers = {}
self._uuid = uuid
- self._proxy.dispatcher.type_handlers.update({
- pr.NOTIFY: dispatcher.Handler(
- self._process_response,
- validator=functools.partial(pr.Notify.validate,
- response=True)),
- })
- self._counter = itertools.count()
+ self._seen_workers = 0
self._messages_processed = 0
+ self._messages_published = 0
+ self._watch = timeutils.StopWatch(duration=beat_periodicity)
@property
def messages_processed(self):
@@ -160,15 +150,30 @@ class ProxyWorkerFinder(WorkerFinder):
def _next_worker(self, topic, tasks, temporary=False):
if not temporary:
- return TopicWorker(topic, tasks,
- identity=six.next(self._counter))
+ w = TopicWorker(topic, tasks, identity=self._seen_workers)
+ self._seen_workers += 1
+ return w
else:
return TopicWorker(topic, tasks)
- @periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True)
- def beat(self):
- """Cyclically called to publish notify message to each topic."""
- self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
+ def maybe_publish(self):
+ """Periodically called to publish notify message to each topic.
+
+ These messages (especially the responses) are how this find learns
+ about workers and what tasks they can perform (so that we can then
+ match workers to tasks to run).
+ """
+ if self._messages_published == 0:
+ self._proxy.publish(pr.Notify(),
+ self._topics, reply_to=self._uuid)
+ self._messages_published += 1
+ self._watch.restart()
+ else:
+ if self._watch.expired():
+ self._proxy.publish(pr.Notify(),
+ self._topics, reply_to=self._uuid)
+ self._messages_published += 1
+ self._watch.restart()
def _total_workers(self):
return len(self._workers)
@@ -191,7 +196,7 @@ class ProxyWorkerFinder(WorkerFinder):
self._workers[topic] = worker
return (worker, True)
- def _process_response(self, data, message):
+ def process_response(self, data, message):
"""Process notify message sent from remote side."""
LOG.debug("Started processing notify response message '%s'",
ku.DelayedPretty(message))
@@ -206,9 +211,12 @@ class ProxyWorkerFinder(WorkerFinder):
self._cond.notify_all()
self._messages_processed += 1
- def clear(self):
+ def reset(self):
with self._cond:
self._workers.clear()
+ self._messages_processed = 0
+ self._messages_published = 0
+ self._seen_workers = 0
self._cond.notify_all()
def get_worker_for_task(self, task):
diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py
index 372f48b..d81fd78 100644
--- a/taskflow/tests/unit/worker_based/test_executor.py
+++ b/taskflow/tests/unit/worker_based/test_executor.py
@@ -85,8 +85,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
on_wait=ex._on_wait,
url=self.broker_url, transport=mock.ANY,
transport_options=mock.ANY,
- retry_options=mock.ANY,
- type_handlers=mock.ANY),
+ retry_options=mock.ANY),
mock.call.proxy.dispatcher.type_handlers.update(mock.ANY),
]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
@@ -284,7 +283,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT))
# start executor again
- ex.start()
+ self.assertRaises(RuntimeError, ex.start)
# stop executor
ex.stop()