diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-02-15 03:06:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-02-15 03:06:10 +0000 |
commit | 6241a5c5920e5997fe1cc392413cc7ce4536c33c (patch) | |
tree | d8a46857e22b9062e92cb11e20d28c71922aca73 | |
parent | 7ea2bfc2974c4854e8d718383af961acb8daffeb (diff) | |
parent | a70bd8a7e59f52bc20dd4e219c4242b0f15664b4 (diff) | |
download | taskflow-6241a5c5920e5997fe1cc392413cc7ce4536c33c.tar.gz |
Merge "Remove need for separate notify thread"
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 59 | ||||
-rw-r--r-- | taskflow/engines/worker_based/types.py | 54 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_executor.py | 5 |
3 files changed, 69 insertions, 49 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index d274953..46c3fdf 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -17,7 +17,6 @@ import functools import threading -from futurist import periodics from oslo_utils import timeutils import six @@ -47,12 +46,7 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._ongoing_requests = {} self._ongoing_requests_lock = threading.RLock() self._transition_timeout = transition_timeout - type_handlers = { - pr.RESPONSE: dispatcher.Handler(self._process_response, - validator=pr.Response.validate), - } self._proxy = proxy.Proxy(uuid, exchange, - type_handlers=type_handlers, on_wait=self._on_wait, url=url, transport=transport, transport_options=transport_options, @@ -64,16 +58,17 @@ class WorkerTaskExecutor(executor.TaskExecutor): # pre-existing knowledge of the topics those workers are on to gather # and update this information). self._finder = wt.ProxyWorkerFinder(uuid, self._proxy, topics) - self._helpers = tu.ThreadBundle() - self._helpers.bind(lambda: tu.daemon_thread(self._proxy.start), - after_start=lambda t: self._proxy.wait(), - before_join=lambda t: self._proxy.stop()) - p_worker = periodics.PeriodicWorker.create([self._finder]) - if p_worker: - self._helpers.bind(lambda: tu.daemon_thread(p_worker.start), - before_join=lambda t: p_worker.stop(), - after_join=lambda t: p_worker.reset(), - before_start=lambda t: p_worker.reset()) + self._proxy.dispatcher.type_handlers.update({ + pr.RESPONSE: dispatcher.Handler(self._process_response, + validator=pr.Response.validate), + pr.NOTIFY: dispatcher.Handler( + self._finder.process_response, + validator=functools.partial(pr.Notify.validate, + response=True)), + }) + # Thread that will run the message dispatching (and periodically + # call the on_wait callback to do various things) loop... + self._helper = None self._messages_processed = { 'finder': self._finder.messages_processed, } @@ -138,8 +133,9 @@ class WorkerTaskExecutor(executor.TaskExecutor): return True return False - def _on_wait(self): - """This function is called cyclically between draining events.""" + def _clean(self): + if not self._ongoing_requests: + return with self._ongoing_requests_lock: ongoing_requests_uuids = set(six.iterkeys(self._ongoing_requests)) waiting_requests = {} @@ -178,6 +174,15 @@ class WorkerTaskExecutor(executor.TaskExecutor): self._publish_request(request, worker) self._messages_processed['finder'] = new_messages_processed + def _on_wait(self): + """This function is called cyclically between draining events.""" + # Publish any finding messages (used to locate workers). + self._finder.maybe_publish() + # Process any expired requests or requests that have no current + # worker located (publish messages for those if we now do have + # a worker located). + self._clean() + def _submit_task(self, task, task_uuid, action, arguments, progress_callback=None, **kwargs): """Submit task request to a worker.""" @@ -249,15 +254,23 @@ class WorkerTaskExecutor(executor.TaskExecutor): timeout=timeout) def start(self): - """Starts proxy thread and associated topic notification thread.""" - self._helpers.start() + """Starts message processing thread.""" + if self._helper is not None: + raise RuntimeError("Worker executor must be stopped before" + " it can be started") + self._helper = tu.daemon_thread(self._proxy.start) + self._helper.start() + self._proxy.wait() def stop(self): - """Stops proxy thread and associated topic notification thread.""" - self._helpers.stop() + """Stops message processing thread.""" + if self._helper is not None: + self._proxy.stop() + self._helper.join() + self._helper = None with self._ongoing_requests_lock: while self._ongoing_requests: _request_uuid, request = self._ongoing_requests.popitem() self._handle_expired_request(request) - self._finder.clear() + self._finder.reset() self._messages_processed['finder'] = self._finder.messages_processed diff --git a/taskflow/engines/worker_based/types.py b/taskflow/engines/worker_based/types.py index c8f11e2..6bc215e 100644 --- a/taskflow/engines/worker_based/types.py +++ b/taskflow/engines/worker_based/types.py @@ -15,17 +15,13 @@ # under the License. import abc -import functools -import itertools import random import threading -from futurist import periodics from oslo_utils import reflection from oslo_utils import timeutils import six -from taskflow.engines.worker_based import dispatcher from taskflow.engines.worker_based import protocol as pr from taskflow import logging from taskflow.utils import kombu_utils as ku @@ -132,27 +128,21 @@ class WorkerFinder(object): def get_worker_for_task(self, task): """Gets a worker that can perform a given task.""" - def clear(self): - pass - class ProxyWorkerFinder(WorkerFinder): """Requests and receives responses about workers topic+task details.""" - def __init__(self, uuid, proxy, topics): + def __init__(self, uuid, proxy, topics, + beat_periodicity=pr.NOTIFY_PERIOD): super(ProxyWorkerFinder, self).__init__() self._proxy = proxy self._topics = topics self._workers = {} self._uuid = uuid - self._proxy.dispatcher.type_handlers.update({ - pr.NOTIFY: dispatcher.Handler( - self._process_response, - validator=functools.partial(pr.Notify.validate, - response=True)), - }) - self._counter = itertools.count() + self._seen_workers = 0 self._messages_processed = 0 + self._messages_published = 0 + self._watch = timeutils.StopWatch(duration=beat_periodicity) @property def messages_processed(self): @@ -160,15 +150,30 @@ class ProxyWorkerFinder(WorkerFinder): def _next_worker(self, topic, tasks, temporary=False): if not temporary: - return TopicWorker(topic, tasks, - identity=six.next(self._counter)) + w = TopicWorker(topic, tasks, identity=self._seen_workers) + self._seen_workers += 1 + return w else: return TopicWorker(topic, tasks) - @periodics.periodic(pr.NOTIFY_PERIOD, run_immediately=True) - def beat(self): - """Cyclically called to publish notify message to each topic.""" - self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) + def maybe_publish(self): + """Periodically called to publish notify message to each topic. + + These messages (especially the responses) are how this find learns + about workers and what tasks they can perform (so that we can then + match workers to tasks to run). + """ + if self._messages_published == 0: + self._proxy.publish(pr.Notify(), + self._topics, reply_to=self._uuid) + self._messages_published += 1 + self._watch.restart() + else: + if self._watch.expired(): + self._proxy.publish(pr.Notify(), + self._topics, reply_to=self._uuid) + self._messages_published += 1 + self._watch.restart() def _total_workers(self): return len(self._workers) @@ -191,7 +196,7 @@ class ProxyWorkerFinder(WorkerFinder): self._workers[topic] = worker return (worker, True) - def _process_response(self, data, message): + def process_response(self, data, message): """Process notify message sent from remote side.""" LOG.debug("Started processing notify response message '%s'", ku.DelayedPretty(message)) @@ -206,9 +211,12 @@ class ProxyWorkerFinder(WorkerFinder): self._cond.notify_all() self._messages_processed += 1 - def clear(self): + def reset(self): with self._cond: self._workers.clear() + self._messages_processed = 0 + self._messages_published = 0 + self._seen_workers = 0 self._cond.notify_all() def get_worker_for_task(self, task): diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 372f48b..d81fd78 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -85,8 +85,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): on_wait=ex._on_wait, url=self.broker_url, transport=mock.ANY, transport_options=mock.ANY, - retry_options=mock.ANY, - type_handlers=mock.ANY), + retry_options=mock.ANY), mock.call.proxy.dispatcher.type_handlers.update(mock.ANY), ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) @@ -284,7 +283,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertTrue(self.proxy_started_event.wait(test_utils.WAIT_TIMEOUT)) # start executor again - ex.start() + self.assertRaises(RuntimeError, ex.start) # stop executor ex.stop() |