diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-01-01 05:56:51 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-01-01 05:56:51 +0000 |
commit | b18798520aabe320b556e45cd86403692d424408 (patch) | |
tree | e5d20f843affa509dfe1107c7fc3ea422cf03fd8 | |
parent | 3fa8bf305fec949b0748d91186cdd2ab8f0ba788 (diff) | |
parent | 50c5441efdd4406d3cb2dac40823a9c318175d9b (diff) | |
download | taskflow-b18798520aabe320b556e45cd86403692d424408.tar.gz |
Merge "Use a single shared queue for an executors lifecycle"
-rw-r--r-- | taskflow/engines/action_engine/executor.py | 344 |
1 files changed, 186 insertions, 158 deletions
diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index de706aa..1ecd1ad 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -15,18 +15,18 @@ # under the License. import abc -import functools +import collections import multiprocessing from multiprocessing import managers import os import pickle -import threading from oslo.utils import excutils +from oslo.utils import reflection from oslo.utils import timeutils +from oslo.utils import uuidutils import six from six.moves import queue as compat_queue -from six.moves import range as compat_range from taskflow import logging from taskflow import task as task_atom @@ -52,16 +52,11 @@ _PICKLE_ERRORS = tuple(_PICKLE_ERRORS) _SEND_ERRORS = (IOError, EOFError) _UPDATE_PROGRESS = task_atom.EVENT_UPDATE_PROGRESS -LOG = logging.getLogger(__name__) - +# Message types/kind sent from worker/child processes... +_KIND_COMPLETE_ME = 'complete_me' +_KIND_EVENT = 'event' -def _maybe_forever(limit=None): - if limit is None: - while True: - yield - else: - for i in compat_range(0, limit): - yield +LOG = logging.getLogger(__name__) def _execute_task(task, arguments, progress_callback=None): @@ -99,12 +94,52 @@ def _revert_task(task, arguments, result, failures, progress_callback=None): return (REVERTED, result) -class _JoinedWorkItem(object): +class _Channel(object): + """Helper wrapper around a multiprocessing queue used by a worker.""" + + def __init__(self, queue, identity): + self._queue = queue + self._identity = identity + self._sent_messages = collections.defaultdict(int) + self._pid = None + + @property + def sent_messages(self): + return self._sent_messages + + def put(self, message): + # NOTE(harlowja): this is done in late in execution to ensure that this + # happens in the child process and not the parent process (where the + # constructor is called). + if self._pid is None: + self._pid = os.getpid() + message.update({ + 'sent_on': timeutils.utcnow(), + 'sender': { + 'pid': self._pid, + 'id': self._identity, + }, + }) + if 'body' not in message: + message['body'] = {} + try: + self._queue.put(message) + except _PICKLE_ERRORS: + LOG.warn("Failed serializing message %s", message, exc_info=True) + return False + except _SEND_ERRORS: + LOG.warn("Failed sending message %s", message, exc_info=True) + return False + else: + self._sent_messages[message['kind']] += 1 + return True + + +class _WaitWorkItem(object): """The piece of work that will executed by a process executor. - This will call the target function, then wait until the queues items - have been completed (via calls to task_done) before offically being - finished. + This will call the target function, then wait until the tasks emitted + events/items have been depleted before offically being finished. NOTE(harlowja): this is done so that the task function will *not* return until all of its notifications have been proxied back to its originating @@ -114,19 +149,28 @@ class _JoinedWorkItem(object): previously finished task... """ - def __init__(self, queue, func, task, *args, **kwargs): - self._queue = queue + def __init__(self, channel, barrier, + func, task, *args, **kwargs): + self._channel = channel + self._barrier = barrier self._func = func self._task = task self._args = args self._kwargs = kwargs def _on_finish(self): - w = timing.StopWatch() - w.start() - self._queue.join() - LOG.blather("Waited %0.2f seconds until task '%s' emitted" - " notifications were depleted", w.elapsed(), self._task) + sent_events = self._channel.sent_messages.get(_KIND_EVENT, 0) + if sent_events: + message = { + 'created_on': timeutils.utcnow(), + 'kind': _KIND_COMPLETE_ME, + } + if self._channel.put(message): + w = timing.StopWatch().start() + self._barrier.wait() + LOG.blather("Waited %s seconds until task '%s' %s emitted" + " notifications were depleted", w.elapsed(), + self._task, sent_events) def __call__(self): args = self._args @@ -140,57 +184,44 @@ class _JoinedWorkItem(object): class _EventSender(object): """Sends event information from a child worker process to its creator.""" - def __init__(self, queue): - self._queue = queue - self._pid = None + def __init__(self, channel): + self._channel = channel def __call__(self, event_type, details): - # NOTE(harlowja): this is done in late in execution to ensure that this - # happens in the child process and not the parent process (where the - # constructor is called). - if self._pid is None: - self._pid = os.getpid() message = { 'created_on': timeutils.utcnow(), - 'sender': { - 'pid': self._pid, - }, + 'kind': _KIND_EVENT, 'body': { 'event_type': event_type, 'details': details, }, } - try: - self._queue.put(message) - except _PICKLE_ERRORS: - LOG.warn("Failed serializing message %s", message, exc_info=True) - except _SEND_ERRORS: - LOG.warn("Failed sending message %s", message, exc_info=True) + self._channel.put(message) -class _EventTarget(object): - """An immutable helper object that represents a target of an event.""" +class _Target(object): + """An immutable helper object that represents a target of a message.""" - def __init__(self, future, task, queue): - self.future = future + def __init__(self, task, barrier, identity): self.task = task - self.queue = queue + self.barrier = barrier + self.identity = identity + # Counters used to track how many message 'kinds' were proxied... + self.dispatched = collections.defaultdict(int) + + def __repr__(self): + return "<%s at 0x%x targeting '%s' with identity '%s'>" % ( + reflection.get_class_name(self), id(self), + self.task, self.identity) -class _EventDispatcher(object): - """Dispatches event information received from child worker processes.""" +class _Dispatcher(object): + """Dispatches messages received from child worker processes.""" # When the run() method is busy (typically in a thread) we want to set # these so that the thread can know how long to sleep when there is no - # active work to dispatch (when there is active targets, there queues - # will have amount/count of items removed before returning to work on - # the next target...) + # active work to dispatch. _SPIN_PERIODICITY = 0.01 - _SPIN_DISPATCH_AMOUNT = 1 - - # TODO(harlowja): look again at using a non-polling mechanism that uses - # select instead of queues to achieve better ability to detect when - # messages are ready/available... def __init__(self, dispatch_periodicity=None): if dispatch_periodicity is None: @@ -198,83 +229,84 @@ class _EventDispatcher(object): if dispatch_periodicity <= 0: raise ValueError("Provided dispatch periodicity must be greater" " than zero and not '%s'" % dispatch_periodicity) - self._targets = set() + self._targets = {} self._dead = threading_utils.Event() - self._lock = threading.Lock() - self._periodicity = dispatch_periodicity + self._dispatch_periodicity = dispatch_periodicity self._stop_when_empty = False - def register(self, target): - with self._lock: - self._targets.add(target) - - def _dispatch_until_empty(self, target, limit=None): - it = _maybe_forever(limit=limit) - while True: - try: - six.next(it) - except StopIteration: - break - else: - try: - message = target.queue.get_nowait() - except compat_queue.Empty: - break - else: - try: - self._dispatch(target.task, message) - finally: - target.queue.task_done() - - def deregister(self, target): - with self._lock: - try: - self._targets.remove(target) - except KeyError: - pass + def register(self, identity, target): + self._targets[identity] = target + + def deregister(self, identity): + try: + target = self._targets.pop(identity) + except KeyError: + pass + else: + # Just incase set the barrier to unblock any worker... + target.barrier.set() + if LOG.isEnabledFor(logging.BLATHER): + LOG.blather("Dispatched %s messages %s to target '%s' during" + " the lifetime of its existence in the dispatcher", + sum(six.itervalues(target.dispatched)), + dict(target.dispatched), target) def reset(self): self._stop_when_empty = False - while self._targets: - self.deregister(self._targets.pop()) self._dead.clear() + if self._targets: + leftover = set(six.iterkeys(self._targets)) + while leftover: + self.deregister(leftover.pop()) def interrupt(self): self._stop_when_empty = True self._dead.set() - def _dispatch(self, task, message): - LOG.blather("Dispatching message %s to task '%s'", message, task) - body = message['body'] - task.notifier.notify(body['event_type'], body['details']) - - def _dispatch_iter(self, targets): - # A generator that yields at certain points to allow the main run() - # method to use this to dispatch in iterations (and also allows it - # to check if it has been stopped between iterations). - for target in targets: - if target not in self._targets: - # Must of been removed... - continue - # NOTE(harlowja): Limits are used here to avoid one - # task unequally dispatching, this forces round-robin - # like behavior... - self._dispatch_until_empty(target, - limit=self._SPIN_DISPATCH_AMOUNT) - yield target - - def run(self): - w = timing.StopWatch(duration=self._periodicity) + def _dispatch(self, message): + if LOG.isEnabledFor(logging.BLATHER): + LOG.blather("Dispatching message %s (it took %s seconds" + " for it to arrive for processing after being" + " sent)", message, + timeutils.delta_seconds(message['sent_on'], + timeutils.utcnow())) + try: + kind = message['kind'] + sender = message['sender'] + body = message['body'] + except (KeyError, ValueError, TypeError): + LOG.warn("Badly formatted message %s received", message, + exc_info=True) + return + target = self._targets.get(sender['id']) + if target is None: + # Must of been removed... + return + if kind == _KIND_COMPLETE_ME: + target.dispatched[kind] += 1 + target.barrier.set() + elif kind == _KIND_EVENT: + task = target.task + target.dispatched[kind] += 1 + task.notifier.notify(body['event_type'], body['details']) + else: + LOG.warn("Unknown message '%s' found in message from sender" + " %s to target '%s'", kind, sender, target) + + def run(self, queue): + w = timing.StopWatch(duration=self._dispatch_periodicity) while (not self._dead.is_set() or (self._stop_when_empty and self._targets)): w.restart() - with self._lock: - targets = self._targets.copy() - for _target in self._dispatch_iter(targets): - if self._stop_when_empty: - continue - if self._dead.is_set(): + leftover = w.leftover() + while leftover: + try: + message = queue.get(timeout=leftover) + except compat_queue.Empty: break + else: + self._dispatch(message) + leftover = w.leftover() leftover = w.leftover() if leftover: self._dead.wait(leftover) @@ -402,12 +434,11 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): super(ParallelProcessTaskExecutor, self).__init__( executor=executor, max_workers=max_workers) self._manager = multiprocessing.Manager() - self._dispatcher = _EventDispatcher( + self._dispatcher = _Dispatcher( dispatch_periodicity=dispatch_periodicity) + # Only created after starting... self._worker = None - - def _queue_factory(self): - return self._manager.JoinableQueue() + self._queue = None def _create_executor(self, max_workers=None): return futures.ProcessPoolExecutor(max_workers=max_workers) @@ -423,7 +454,9 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): self._manager.start() if not threading_utils.is_alive(self._worker): self._dispatcher.reset() - self._worker = threading_utils.daemon_thread(self._dispatcher.run) + self._queue = self._manager.Queue() + self._worker = threading_utils.daemon_thread(self._dispatcher.run, + self._queue) self._worker.start() def stop(self): @@ -432,11 +465,12 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): if threading_utils.is_alive(self._worker): self._worker.join() self._worker = None + self._queue = None self._dispatcher.reset() self._manager.shutdown() self._manager.join() - def _rebind_task(self, task, clone, queue, progress_callback=None): + def _rebind_task(self, task, clone, channel, progress_callback=None): # Creates and binds proxies for all events the task could receive # so that when the clone runs in another process that this task # can recieve the same notifications (thus making it look like the @@ -448,8 +482,7 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): if progress_callback is not None: needed.add(_UPDATE_PROGRESS) for event_type in needed: - clone.notifier.register(event_type, _EventSender(queue)) - return needed + clone.notifier.register(event_type, _EventSender(channel)) def _submit_task(self, func, task, *args, **kwargs): """Submit a function to run the given task (with given args/kwargs). @@ -474,49 +507,44 @@ class ParallelProcessTaskExecutor(ParallelTaskExecutor): any listeners) and then reattach a new set of listeners that will now instead of calling the desired listeners just place messages for this process (a dispatcher thread that is created in this class) - to dispatch to the original task (using a per task queue that is used - and associated to know which task to proxy back too, since it is - possible that there many be *many* subprocess running at the same - time, each running a different task). + to dispatch to the original task (using a common queue + per task + sender identity/target that is used and associated to know which task + to proxy back too, since it is possible that there many be *many* + subprocess running at the same time, each running a different task + and using the same common queue to submit messages back to). Once the subprocess task has finished execution, the executor will - then trigger a callback (``on_done`` in this case) that will remove - the task + queue from the dispatcher (which will stop any further - proxying back to the original task). + then trigger a callback that will remove the task + target from the + dispatcher (which will stop any further proxying back to the original + task). """ progress_callback = kwargs.pop('progress_callback', None) clone = task.copy(retain_listeners=False) - queue = self._queue_factory() - bound = self._rebind_task(task, clone, queue, - progress_callback=progress_callback) - LOG.blather("Bound %s event types to clone of '%s'", bound, task) - if progress_callback is not None: - binder = functools.partial(task.notifier.register, - _UPDATE_PROGRESS, progress_callback) - unbinder = functools.partial(task.notifier.deregister, - _UPDATE_PROGRESS, progress_callback) - else: - binder = unbinder = lambda: None - - # Ensure the target task (not the clone) is ready and able to receive - # dispatched messages (and start the dispatching process by - # registering) with the dispatcher. - binder() - work = _JoinedWorkItem(queue, func, clone, *args, **kwargs) + identity = uuidutils.generate_uuid() + target = _Target(task, self._manager.Event(), identity) + channel = _Channel(self._queue, identity) + self._rebind_task(task, clone, channel, + progress_callback=progress_callback) + + def register(): + if progress_callback is not None: + task.notifier.register(_UPDATE_PROGRESS, progress_callback) + self._dispatcher.register(identity, target) + + def deregister(): + if progress_callback is not None: + task.notifier.deregister(_UPDATE_PROGRESS, progress_callback) + self._dispatcher.deregister(identity) + + register() + work = _WaitWorkItem(channel, target.barrier, + func, clone, *args, **kwargs) try: fut = self._executor.submit(work) except RuntimeError: with excutils.save_and_reraise_exception(): - unbinder() - - # This will trigger the proxying to begin... - target = _EventTarget(fut, task, queue) - self._dispatcher.register(target) - - def on_done(unbinder, target, fut): - self._dispatcher.deregister(target) - unbinder() + deregister() fut.atom = task - fut.add_done_callback(functools.partial(on_done, unbinder, target)) + fut.add_done_callback(lambda fut: deregister()) return fut |