diff options
author | James Cammarata <jimi@sngx.net> | 2017-06-01 05:50:39 -0500 |
---|---|---|
committer | James Cammarata <jimi@sngx.net> | 2017-10-02 12:25:00 -0500 |
commit | 5e6364751749b6bc30031edad724e53545e6c671 (patch) | |
tree | 7a6ab325a41fde0991191565d6238fdd94730fb8 | |
parent | f95160723dbea49111e43f3dc0147631be9240f6 (diff) | |
download | ansible-5e6364751749b6bc30031edad724e53545e6c671.tar.gz |
Use persistent threads with job/results queues
Instead of creating a one-time use thread for every host/task
-rw-r--r-- | lib/ansible/executor/process/threading.py | 103 | ||||
-rw-r--r-- | lib/ansible/executor/task_executor.py | 9 | ||||
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 67 | ||||
-rw-r--r-- | lib/ansible/plugins/strategy/__init__.py | 118 |
4 files changed, 140 insertions, 157 deletions
diff --git a/lib/ansible/executor/process/threading.py b/lib/ansible/executor/process/threading.py index 63ef2a6118..0f249b4609 100644 --- a/lib/ansible/executor/process/threading.py +++ b/lib/ansible/executor/process/threading.py @@ -21,6 +21,7 @@ __metaclass__ = type import os import sys +import time import traceback from jinja2.exceptions import TemplateNotFound @@ -39,7 +40,7 @@ except ImportError: __all__ = ['WorkerProcess'] -def run_worker(task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): +def run_worker(tqm, shared_loader_obj): ''' The worker thread class, which uses TaskExecutor to run tasks read from a job queue and pushes results into a results queue @@ -50,51 +51,59 @@ def run_worker(task_vars, host, task, play_context, loader, variable_manager, sh # pr = cProfile.Profile() # pr.enable() - try: - # execute the task and build a TaskResult from the result - display.debug("running TaskExecutor() for %s/%s" % (host, task)) - executor_result = TaskExecutor( - host, - task, - task_vars, - play_context, - None, #new_stdin - loader, - shared_loader_obj, - None, #rslt_q - ).run() - - display.debug("done running TaskExecutor() for %s/%s" % (host, task)) - task_result = TaskResult( - host, - task, - executor_result, - ) - - # put the result on the result queue - display.debug("sending task result") - return task_result - - except AnsibleConnectionFailure: - return TaskResult( - host, - task, - dict(unreachable=True), - ) - - except Exception as e: - if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): - try: - return TaskResult( - host, - task, - dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''), - ) - except: - display.debug(u"WORKER EXCEPTION: %s" % to_text(e)) - display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) - - display.debug("WORKER PROCESS EXITING") + display.debug("STARTING WORKER") + while not tqm._terminated: + job = tqm.get_job() + if job is None: + time.sleep(0.0001) + continue + + display.debug("WORKER GOT A JOB") + (host, task, play_context, task_vars) = job + + try: + # execute the task and build a TaskResult from the result + display.debug("running TaskExecutor() for %s/%s" % (host, task)) + executor_result = TaskExecutor( + host, + task, + task_vars, + play_context, + None, #new_stdin + tqm._loader, + shared_loader_obj, + tqm, #rslt_q + ).run() + + display.debug("done running TaskExecutor() for %s/%s" % (host, task)) + + # put the result on the result queue + display.debug("sending task result") + tqm.put_result(TaskResult( + host, + task, + executor_result, + )) + display.debug("done task result") + + except AnsibleConnectionFailure: + tqm.put_result(TaskResult( + host, + task, + dict(unreachable=True), + )) + + except Exception as e: + if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound): + try: + tqm.put_result(TaskResult( + host, + task, + dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''), + )) + except: + display.debug(u"WORKER EXCEPTION: %s" % to_text(e)) + display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc())) # pr.disable() # s = StringIO.StringIO() @@ -103,3 +112,5 @@ def run_worker(task_vars, host, task, play_context, loader, variable_manager, sh # ps.print_stats() # with open('worker_%06d.stats' % os.getpid(), 'w') as f: # f.write(s.getvalue()) + + display.debug("WORKER PROCESS EXITING") diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index cd84a55a45..1f4a94ec00 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -299,14 +299,13 @@ class TaskExecutor: templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars) res['_ansible_item_label'] = templar.template(label) - self._rslt_q.put( + self._rslt_q.put_result( TaskResult( - self._host.name, - self._task._uuid, + self._host, + self._task, res, task_fields=self._task.dump_attrs(), ), - block=False, ) results.append(res) del task_vars[loop_var] @@ -591,7 +590,7 @@ class TaskExecutor: result['_ansible_retry'] = True result['retries'] = retries display.debug('Retrying task, attempt %d of %d' % (attempt, retries)) - self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False) + self._rslt_q.put_result(TaskResult(self._host, self._task, result, task_fields=self._task.dump_attrs())) time.sleep(delay) else: if retries > 1: diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index d2cade56cf..03cf96f2c9 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -23,17 +23,19 @@ import threading import os import tempfile +from collections import deque from concurrent.futures import ThreadPoolExecutor as PoolExecutor from ansible import constants as C from ansible.errors import AnsibleError from ansible.executor.play_iterator import PlayIterator +from ansible.executor.process.threading import run_worker from ansible.executor.stats import AggregateStats from ansible.module_utils.six import string_types from ansible.module_utils._text import to_text from ansible.playbook.block import Block from ansible.playbook.play_context import PlayContext -from ansible.plugins.loader import callback_loader, strategy_loader, module_loader +from ansible.plugins.loader import action_loader, connection_loader, filter_loader, test_loader, lookup_loader, callback_loader, strategy_loader, module_loader from ansible.plugins.callback import CallbackBase from ansible.template import Templar from ansible.utils.helpers import pct_to_int @@ -50,6 +52,23 @@ except ImportError: __all__ = ['TaskQueueManager'] +# TODO: this should probably be in the plugins/__init__.py, with +# a smarter mechanism to set all of the attributes based on +# the loaders created there +class SharedPluginLoaderObj: + ''' + A simple object to make pass the various plugin loaders to + the forked processes over the queue easier + ''' + def __init__(self): + self.action_loader = action_loader + self.connection_loader = connection_loader + self.filter_loader = filter_loader + self.test_loader = test_loader + self.lookup_loader = lookup_loader + self.module_loader = module_loader + + class TaskQueueManager: ''' @@ -107,6 +126,39 @@ class TaskQueueManager: self._connection_lockfile = tempfile.TemporaryFile() self._executor = None + self._job_queue = deque() + self._job_queue_lock = threading.Lock() + + self._res_queue = deque() + self._res_queue_lock = threading.Lock() + + def _put_in_queue(self, data, queue, lock): + lock.acquire() + queue.appendleft(data) + lock.release() + + def _pop_off_queue(self, queue, lock): + try: + data = None + lock.acquire() + data = queue.pop() + except: + pass + finally: + lock.release() + return data + + def put_job(self, data): + self._put_in_queue(data, self._job_queue, self._job_queue_lock) + + def get_job(self): + return self._pop_off_queue(self._job_queue, self._job_queue_lock) + + def put_result(self, data): + self._put_in_queue(data, self._res_queue, self._res_queue_lock) + + def get_result(self): + return self._pop_off_queue(self._res_queue, self._res_queue_lock) def _initialize_processes(self, num): # FIXME: be safe about creating this @@ -114,9 +166,18 @@ class TaskQueueManager: # FIXME: do we need a global lock for workers here instead of a per-worker? self._workers = [] + # create a dummy object with plugin loaders set as an easier + # way to share them with the forked processes + shared_loader_obj = SharedPluginLoaderObj() + for i in range(num): + w_thread = self._executor.submit( + run_worker, + self, + shared_loader_obj + ) w_lock = threading.Lock() - self._workers.append([None, w_lock]) + self._workers.append([w_thread, w_lock]) def _initialize_notified_handlers(self, play): ''' @@ -322,7 +383,7 @@ class TaskQueueManager: def _cleanup_processes(self): if hasattr(self, '_workers'): for (w_thread, w_lock) in self._workers: - if w_thread and w_thread.is_running(): + if w_thread and not w_thread.running(): w_thread.cancel() def clear_failed_hosts(self): diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index af191cc892..b111e5f4ea 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -29,8 +29,6 @@ from jinja2.exceptions import UndefinedError from ansible import constants as C from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable from ansible.executor import action_write_locks -#from ansible.executor.process.worker import WorkerProcess -from ansible.executor.process.threading import run_worker from ansible.executor.task_result import TaskResult from ansible.inventory.host import Host from ansible.module_utils.six.moves import queue as Queue @@ -54,47 +52,6 @@ except ImportError: __all__ = ['StrategyBase'] -# TODO: this should probably be in the plugins/__init__.py, with -# a smarter mechanism to set all of the attributes based on -# the loaders created there -class SharedPluginLoaderObj: - ''' - A simple object to make pass the various plugin loaders to - the forked processes over the queue easier - ''' - def __init__(self): - self.action_loader = action_loader - self.connection_loader = connection_loader - self.filter_loader = filter_loader - self.test_loader = test_loader - self.lookup_loader = lookup_loader - self.module_loader = module_loader - -def results_thread_main(strategy): - while not strategy._tqm._terminated: - try: - did_work = False - for idx, slot in enumerate(strategy._tqm._workers): - (w_thread, w_lock) = slot - try: - w_lock.acquire() - if w_thread and w_thread.done(): - result = w_thread.result() - try: - strategy._results_lock.acquire() - strategy._results.append(result) - finally: - strategy._results_lock.release() - strategy._tqm._workers[idx] = [None, w_lock] - did_work = True - finally: - w_lock.release() - if not did_work: - time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) - except Exception as e: - pass - print("RESULTS THREAD EXITED!!!") - class StrategyBase: @@ -125,17 +82,8 @@ class StrategyBase: # outstanding tasks still in queue self._blocked_hosts = dict() - self._results = deque() - self._results_lock = threading.Condition(threading.Lock()) - - # create the result processing thread for reading results in the background - self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) - self._results_thread.daemon = True - self._results_thread.start() - def cleanup(self): self._tqm.terminate() - self._results_thread.join() def run(self, iterator, play_context, result=0): # execute one more pass through the iterator without peeking, to @@ -206,43 +154,9 @@ class StrategyBase: display.debug('Creating lock for %s' % task.action) action_write_locks.action_write_locks[task.action] = threading.Lock() - # and then queue the new task - try: - # create a dummy object with plugin loaders set as an easier - # way to share them with the forked processes - shared_loader_obj = SharedPluginLoaderObj() - - queued = False - starting_worker = self._cur_worker - while True: - (w_thread, w_lock) = self._workers[self._cur_worker] - if w_thread is None: - w_thread = self._tqm._executor.submit( - run_worker, - task_vars, - host, - task, - play_context, - self._loader, - self._variable_manager, - shared_loader_obj - ) - self._workers[self._cur_worker][0] = w_thread - display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers))) - queued = True - self._cur_worker += 1 - if self._cur_worker >= len(self._workers): - self._cur_worker = 0 - if queued: - break - elif self._cur_worker == starting_worker: - time.sleep(0.0001) - - self._pending_results += 1 - except (EOFError, IOError, AssertionError) as e: - # most likely an abort - display.debug("got an error while queuing: %s" % e) - return + self._tqm.put_job((host, task, play_context, task_vars)) + self._pending_results += 1 + display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) def get_task_hosts(self, iterator, task_host, task): @@ -333,23 +247,21 @@ class StrategyBase: cur_pass = 0 while True: - try: - self._results_lock.acquire() - task_result = self._results.popleft() - except IndexError: + task_result = self._tqm.get_result() + if task_result is None: break - finally: - self._results_lock.release() # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. - original_host = get_original_host(task_result._host) - found_task = iterator.get_original_task(original_host, task_result._task) - original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) - original_task._parent = found_task._parent - original_task.from_attrs(task_result._task_fields) - - task_result._host = original_host - task_result._task = original_task + #original_host = get_original_host(task_result._host) + #found_task = iterator.get_original_task(original_host, task_result._task) + #original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) + #original_task._parent = found_task._parent + #original_task.from_attrs(task_result._task_fields) + + #task_result._host = original_host + #task_result._task = original_task + original_host = task_result._host + original_task = task_result._task # get the correct loop var for use later if original_task.loop_control: |