summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-01-01 05:56:51 +0000
committerGerrit Code Review <review@openstack.org>2015-01-01 05:56:51 +0000
commitb18798520aabe320b556e45cd86403692d424408 (patch)
treee5d20f843affa509dfe1107c7fc3ea422cf03fd8
parent3fa8bf305fec949b0748d91186cdd2ab8f0ba788 (diff)
parent50c5441efdd4406d3cb2dac40823a9c318175d9b (diff)
downloadtaskflow-b18798520aabe320b556e45cd86403692d424408.tar.gz
Merge "Use a single shared queue for an executors lifecycle"
-rw-r--r--taskflow/engines/action_engine/executor.py344
1 files changed, 186 insertions, 158 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py
index de706aa..1ecd1ad 100644
--- a/taskflow/engines/action_engine/executor.py
+++ b/taskflow/engines/action_engine/executor.py
@@ -15,18 +15,18 @@
# under the License.
import abc
-import functools
+import collections
import multiprocessing
from multiprocessing import managers
import os
import pickle
-import threading
from oslo.utils import excutils
+from oslo.utils import reflection
from oslo.utils import timeutils
+from oslo.utils import uuidutils
import six
from six.moves import queue as compat_queue
-from six.moves import range as compat_range
from taskflow import logging
from taskflow import task as task_atom
@@ -52,16 +52,11 @@ _PICKLE_ERRORS = tuple(_PICKLE_ERRORS)
_SEND_ERRORS = (IOError, EOFError)
_UPDATE_PROGRESS = task_atom.EVENT_UPDATE_PROGRESS
-LOG = logging.getLogger(__name__)
-
+# Message types/kind sent from worker/child processes...
+_KIND_COMPLETE_ME = 'complete_me'
+_KIND_EVENT = 'event'
-def _maybe_forever(limit=None):
- if limit is None:
- while True:
- yield
- else:
- for i in compat_range(0, limit):
- yield
+LOG = logging.getLogger(__name__)
def _execute_task(task, arguments, progress_callback=None):
@@ -99,12 +94,52 @@ def _revert_task(task, arguments, result, failures, progress_callback=None):
return (REVERTED, result)
-class _JoinedWorkItem(object):
+class _Channel(object):
+ """Helper wrapper around a multiprocessing queue used by a worker."""
+
+ def __init__(self, queue, identity):
+ self._queue = queue
+ self._identity = identity
+ self._sent_messages = collections.defaultdict(int)
+ self._pid = None
+
+ @property
+ def sent_messages(self):
+ return self._sent_messages
+
+ def put(self, message):
+ # NOTE(harlowja): this is done in late in execution to ensure that this
+ # happens in the child process and not the parent process (where the
+ # constructor is called).
+ if self._pid is None:
+ self._pid = os.getpid()
+ message.update({
+ 'sent_on': timeutils.utcnow(),
+ 'sender': {
+ 'pid': self._pid,
+ 'id': self._identity,
+ },
+ })
+ if 'body' not in message:
+ message['body'] = {}
+ try:
+ self._queue.put(message)
+ except _PICKLE_ERRORS:
+ LOG.warn("Failed serializing message %s", message, exc_info=True)
+ return False
+ except _SEND_ERRORS:
+ LOG.warn("Failed sending message %s", message, exc_info=True)
+ return False
+ else:
+ self._sent_messages[message['kind']] += 1
+ return True
+
+
+class _WaitWorkItem(object):
"""The piece of work that will executed by a process executor.
- This will call the target function, then wait until the queues items
- have been completed (via calls to task_done) before offically being
- finished.
+ This will call the target function, then wait until the tasks emitted
+ events/items have been depleted before offically being finished.
NOTE(harlowja): this is done so that the task function will *not* return
until all of its notifications have been proxied back to its originating
@@ -114,19 +149,28 @@ class _JoinedWorkItem(object):
previously finished task...
"""
- def __init__(self, queue, func, task, *args, **kwargs):
- self._queue = queue
+ def __init__(self, channel, barrier,
+ func, task, *args, **kwargs):
+ self._channel = channel
+ self._barrier = barrier
self._func = func
self._task = task
self._args = args
self._kwargs = kwargs
def _on_finish(self):
- w = timing.StopWatch()
- w.start()
- self._queue.join()
- LOG.blather("Waited %0.2f seconds until task '%s' emitted"
- " notifications were depleted", w.elapsed(), self._task)
+ sent_events = self._channel.sent_messages.get(_KIND_EVENT, 0)
+ if sent_events:
+ message = {
+ 'created_on': timeutils.utcnow(),
+ 'kind': _KIND_COMPLETE_ME,
+ }
+ if self._channel.put(message):
+ w = timing.StopWatch().start()
+ self._barrier.wait()
+ LOG.blather("Waited %s seconds until task '%s' %s emitted"
+ " notifications were depleted", w.elapsed(),
+ self._task, sent_events)
def __call__(self):
args = self._args
@@ -140,57 +184,44 @@ class _JoinedWorkItem(object):
class _EventSender(object):
"""Sends event information from a child worker process to its creator."""
- def __init__(self, queue):
- self._queue = queue
- self._pid = None
+ def __init__(self, channel):
+ self._channel = channel
def __call__(self, event_type, details):
- # NOTE(harlowja): this is done in late in execution to ensure that this
- # happens in the child process and not the parent process (where the
- # constructor is called).
- if self._pid is None:
- self._pid = os.getpid()
message = {
'created_on': timeutils.utcnow(),
- 'sender': {
- 'pid': self._pid,
- },
+ 'kind': _KIND_EVENT,
'body': {
'event_type': event_type,
'details': details,
},
}
- try:
- self._queue.put(message)
- except _PICKLE_ERRORS:
- LOG.warn("Failed serializing message %s", message, exc_info=True)
- except _SEND_ERRORS:
- LOG.warn("Failed sending message %s", message, exc_info=True)
+ self._channel.put(message)
-class _EventTarget(object):
- """An immutable helper object that represents a target of an event."""
+class _Target(object):
+ """An immutable helper object that represents a target of a message."""
- def __init__(self, future, task, queue):
- self.future = future
+ def __init__(self, task, barrier, identity):
self.task = task
- self.queue = queue
+ self.barrier = barrier
+ self.identity = identity
+ # Counters used to track how many message 'kinds' were proxied...
+ self.dispatched = collections.defaultdict(int)
+
+ def __repr__(self):
+ return "<%s at 0x%x targeting '%s' with identity '%s'>" % (
+ reflection.get_class_name(self), id(self),
+ self.task, self.identity)
-class _EventDispatcher(object):
- """Dispatches event information received from child worker processes."""
+class _Dispatcher(object):
+ """Dispatches messages received from child worker processes."""
# When the run() method is busy (typically in a thread) we want to set
# these so that the thread can know how long to sleep when there is no
- # active work to dispatch (when there is active targets, there queues
- # will have amount/count of items removed before returning to work on
- # the next target...)
+ # active work to dispatch.
_SPIN_PERIODICITY = 0.01
- _SPIN_DISPATCH_AMOUNT = 1
-
- # TODO(harlowja): look again at using a non-polling mechanism that uses
- # select instead of queues to achieve better ability to detect when
- # messages are ready/available...
def __init__(self, dispatch_periodicity=None):
if dispatch_periodicity is None:
@@ -198,83 +229,84 @@ class _EventDispatcher(object):
if dispatch_periodicity <= 0:
raise ValueError("Provided dispatch periodicity must be greater"
" than zero and not '%s'" % dispatch_periodicity)
- self._targets = set()
+ self._targets = {}
self._dead = threading_utils.Event()
- self._lock = threading.Lock()
- self._periodicity = dispatch_periodicity
+ self._dispatch_periodicity = dispatch_periodicity
self._stop_when_empty = False
- def register(self, target):
- with self._lock:
- self._targets.add(target)
-
- def _dispatch_until_empty(self, target, limit=None):
- it = _maybe_forever(limit=limit)
- while True:
- try:
- six.next(it)
- except StopIteration:
- break
- else:
- try:
- message = target.queue.get_nowait()
- except compat_queue.Empty:
- break
- else:
- try:
- self._dispatch(target.task, message)
- finally:
- target.queue.task_done()
-
- def deregister(self, target):
- with self._lock:
- try:
- self._targets.remove(target)
- except KeyError:
- pass
+ def register(self, identity, target):
+ self._targets[identity] = target
+
+ def deregister(self, identity):
+ try:
+ target = self._targets.pop(identity)
+ except KeyError:
+ pass
+ else:
+ # Just incase set the barrier to unblock any worker...
+ target.barrier.set()
+ if LOG.isEnabledFor(logging.BLATHER):
+ LOG.blather("Dispatched %s messages %s to target '%s' during"
+ " the lifetime of its existence in the dispatcher",
+ sum(six.itervalues(target.dispatched)),
+ dict(target.dispatched), target)
def reset(self):
self._stop_when_empty = False
- while self._targets:
- self.deregister(self._targets.pop())
self._dead.clear()
+ if self._targets:
+ leftover = set(six.iterkeys(self._targets))
+ while leftover:
+ self.deregister(leftover.pop())
def interrupt(self):
self._stop_when_empty = True
self._dead.set()
- def _dispatch(self, task, message):
- LOG.blather("Dispatching message %s to task '%s'", message, task)
- body = message['body']
- task.notifier.notify(body['event_type'], body['details'])
-
- def _dispatch_iter(self, targets):
- # A generator that yields at certain points to allow the main run()
- # method to use this to dispatch in iterations (and also allows it
- # to check if it has been stopped between iterations).
- for target in targets:
- if target not in self._targets:
- # Must of been removed...
- continue
- # NOTE(harlowja): Limits are used here to avoid one
- # task unequally dispatching, this forces round-robin
- # like behavior...
- self._dispatch_until_empty(target,
- limit=self._SPIN_DISPATCH_AMOUNT)
- yield target
-
- def run(self):
- w = timing.StopWatch(duration=self._periodicity)
+ def _dispatch(self, message):
+ if LOG.isEnabledFor(logging.BLATHER):
+ LOG.blather("Dispatching message %s (it took %s seconds"
+ " for it to arrive for processing after being"
+ " sent)", message,
+ timeutils.delta_seconds(message['sent_on'],
+ timeutils.utcnow()))
+ try:
+ kind = message['kind']
+ sender = message['sender']
+ body = message['body']
+ except (KeyError, ValueError, TypeError):
+ LOG.warn("Badly formatted message %s received", message,
+ exc_info=True)
+ return
+ target = self._targets.get(sender['id'])
+ if target is None:
+ # Must of been removed...
+ return
+ if kind == _KIND_COMPLETE_ME:
+ target.dispatched[kind] += 1
+ target.barrier.set()
+ elif kind == _KIND_EVENT:
+ task = target.task
+ target.dispatched[kind] += 1
+ task.notifier.notify(body['event_type'], body['details'])
+ else:
+ LOG.warn("Unknown message '%s' found in message from sender"
+ " %s to target '%s'", kind, sender, target)
+
+ def run(self, queue):
+ w = timing.StopWatch(duration=self._dispatch_periodicity)
while (not self._dead.is_set() or
(self._stop_when_empty and self._targets)):
w.restart()
- with self._lock:
- targets = self._targets.copy()
- for _target in self._dispatch_iter(targets):
- if self._stop_when_empty:
- continue
- if self._dead.is_set():
+ leftover = w.leftover()
+ while leftover:
+ try:
+ message = queue.get(timeout=leftover)
+ except compat_queue.Empty:
break
+ else:
+ self._dispatch(message)
+ leftover = w.leftover()
leftover = w.leftover()
if leftover:
self._dead.wait(leftover)
@@ -402,12 +434,11 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
super(ParallelProcessTaskExecutor, self).__init__(
executor=executor, max_workers=max_workers)
self._manager = multiprocessing.Manager()
- self._dispatcher = _EventDispatcher(
+ self._dispatcher = _Dispatcher(
dispatch_periodicity=dispatch_periodicity)
+ # Only created after starting...
self._worker = None
-
- def _queue_factory(self):
- return self._manager.JoinableQueue()
+ self._queue = None
def _create_executor(self, max_workers=None):
return futures.ProcessPoolExecutor(max_workers=max_workers)
@@ -423,7 +454,9 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
self._manager.start()
if not threading_utils.is_alive(self._worker):
self._dispatcher.reset()
- self._worker = threading_utils.daemon_thread(self._dispatcher.run)
+ self._queue = self._manager.Queue()
+ self._worker = threading_utils.daemon_thread(self._dispatcher.run,
+ self._queue)
self._worker.start()
def stop(self):
@@ -432,11 +465,12 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
if threading_utils.is_alive(self._worker):
self._worker.join()
self._worker = None
+ self._queue = None
self._dispatcher.reset()
self._manager.shutdown()
self._manager.join()
- def _rebind_task(self, task, clone, queue, progress_callback=None):
+ def _rebind_task(self, task, clone, channel, progress_callback=None):
# Creates and binds proxies for all events the task could receive
# so that when the clone runs in another process that this task
# can recieve the same notifications (thus making it look like the
@@ -448,8 +482,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
if progress_callback is not None:
needed.add(_UPDATE_PROGRESS)
for event_type in needed:
- clone.notifier.register(event_type, _EventSender(queue))
- return needed
+ clone.notifier.register(event_type, _EventSender(channel))
def _submit_task(self, func, task, *args, **kwargs):
"""Submit a function to run the given task (with given args/kwargs).
@@ -474,49 +507,44 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor):
any listeners) and then reattach a new set of listeners that will
now instead of calling the desired listeners just place messages
for this process (a dispatcher thread that is created in this class)
- to dispatch to the original task (using a per task queue that is used
- and associated to know which task to proxy back too, since it is
- possible that there many be *many* subprocess running at the same
- time, each running a different task).
+ to dispatch to the original task (using a common queue + per task
+ sender identity/target that is used and associated to know which task
+ to proxy back too, since it is possible that there many be *many*
+ subprocess running at the same time, each running a different task
+ and using the same common queue to submit messages back to).
Once the subprocess task has finished execution, the executor will
- then trigger a callback (``on_done`` in this case) that will remove
- the task + queue from the dispatcher (which will stop any further
- proxying back to the original task).
+ then trigger a callback that will remove the task + target from the
+ dispatcher (which will stop any further proxying back to the original
+ task).
"""
progress_callback = kwargs.pop('progress_callback', None)
clone = task.copy(retain_listeners=False)
- queue = self._queue_factory()
- bound = self._rebind_task(task, clone, queue,
- progress_callback=progress_callback)
- LOG.blather("Bound %s event types to clone of '%s'", bound, task)
- if progress_callback is not None:
- binder = functools.partial(task.notifier.register,
- _UPDATE_PROGRESS, progress_callback)
- unbinder = functools.partial(task.notifier.deregister,
- _UPDATE_PROGRESS, progress_callback)
- else:
- binder = unbinder = lambda: None
-
- # Ensure the target task (not the clone) is ready and able to receive
- # dispatched messages (and start the dispatching process by
- # registering) with the dispatcher.
- binder()
- work = _JoinedWorkItem(queue, func, clone, *args, **kwargs)
+ identity = uuidutils.generate_uuid()
+ target = _Target(task, self._manager.Event(), identity)
+ channel = _Channel(self._queue, identity)
+ self._rebind_task(task, clone, channel,
+ progress_callback=progress_callback)
+
+ def register():
+ if progress_callback is not None:
+ task.notifier.register(_UPDATE_PROGRESS, progress_callback)
+ self._dispatcher.register(identity, target)
+
+ def deregister():
+ if progress_callback is not None:
+ task.notifier.deregister(_UPDATE_PROGRESS, progress_callback)
+ self._dispatcher.deregister(identity)
+
+ register()
+ work = _WaitWorkItem(channel, target.barrier,
+ func, clone, *args, **kwargs)
try:
fut = self._executor.submit(work)
except RuntimeError:
with excutils.save_and_reraise_exception():
- unbinder()
-
- # This will trigger the proxying to begin...
- target = _EventTarget(fut, task, queue)
- self._dispatcher.register(target)
-
- def on_done(unbinder, target, fut):
- self._dispatcher.deregister(target)
- unbinder()
+ deregister()
fut.atom = task
- fut.add_done_callback(functools.partial(on_done, unbinder, target))
+ fut.add_done_callback(lambda fut: deregister())
return fut