summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Cammarata <jimi@sngx.net>2017-06-01 05:50:39 -0500
committerJames Cammarata <jimi@sngx.net>2017-10-02 12:25:00 -0500
commit5e6364751749b6bc30031edad724e53545e6c671 (patch)
tree7a6ab325a41fde0991191565d6238fdd94730fb8
parentf95160723dbea49111e43f3dc0147631be9240f6 (diff)
downloadansible-5e6364751749b6bc30031edad724e53545e6c671.tar.gz
Use persistent threads with job/results queues
Instead of creating a one-time use thread for every host/task
-rw-r--r--lib/ansible/executor/process/threading.py103
-rw-r--r--lib/ansible/executor/task_executor.py9
-rw-r--r--lib/ansible/executor/task_queue_manager.py67
-rw-r--r--lib/ansible/plugins/strategy/__init__.py118
4 files changed, 140 insertions, 157 deletions
diff --git a/lib/ansible/executor/process/threading.py b/lib/ansible/executor/process/threading.py
index 63ef2a6118..0f249b4609 100644
--- a/lib/ansible/executor/process/threading.py
+++ b/lib/ansible/executor/process/threading.py
@@ -21,6 +21,7 @@ __metaclass__ = type
import os
import sys
+import time
import traceback
from jinja2.exceptions import TemplateNotFound
@@ -39,7 +40,7 @@ except ImportError:
__all__ = ['WorkerProcess']
-def run_worker(task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
+def run_worker(tqm, shared_loader_obj):
'''
The worker thread class, which uses TaskExecutor to run tasks
read from a job queue and pushes results into a results queue
@@ -50,51 +51,59 @@ def run_worker(task_vars, host, task, play_context, loader, variable_manager, sh
# pr = cProfile.Profile()
# pr.enable()
- try:
- # execute the task and build a TaskResult from the result
- display.debug("running TaskExecutor() for %s/%s" % (host, task))
- executor_result = TaskExecutor(
- host,
- task,
- task_vars,
- play_context,
- None, #new_stdin
- loader,
- shared_loader_obj,
- None, #rslt_q
- ).run()
-
- display.debug("done running TaskExecutor() for %s/%s" % (host, task))
- task_result = TaskResult(
- host,
- task,
- executor_result,
- )
-
- # put the result on the result queue
- display.debug("sending task result")
- return task_result
-
- except AnsibleConnectionFailure:
- return TaskResult(
- host,
- task,
- dict(unreachable=True),
- )
-
- except Exception as e:
- if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
- try:
- return TaskResult(
- host,
- task,
- dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
- )
- except:
- display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
- display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
-
- display.debug("WORKER PROCESS EXITING")
+ display.debug("STARTING WORKER")
+ while not tqm._terminated:
+ job = tqm.get_job()
+ if job is None:
+ time.sleep(0.0001)
+ continue
+
+ display.debug("WORKER GOT A JOB")
+ (host, task, play_context, task_vars) = job
+
+ try:
+ # execute the task and build a TaskResult from the result
+ display.debug("running TaskExecutor() for %s/%s" % (host, task))
+ executor_result = TaskExecutor(
+ host,
+ task,
+ task_vars,
+ play_context,
+ None, #new_stdin
+ tqm._loader,
+ shared_loader_obj,
+ tqm, #rslt_q
+ ).run()
+
+ display.debug("done running TaskExecutor() for %s/%s" % (host, task))
+
+ # put the result on the result queue
+ display.debug("sending task result")
+ tqm.put_result(TaskResult(
+ host,
+ task,
+ executor_result,
+ ))
+ display.debug("done task result")
+
+ except AnsibleConnectionFailure:
+ tqm.put_result(TaskResult(
+ host,
+ task,
+ dict(unreachable=True),
+ ))
+
+ except Exception as e:
+ if not isinstance(e, (IOError, EOFError, KeyboardInterrupt, SystemExit)) or isinstance(e, TemplateNotFound):
+ try:
+ tqm.put_result(TaskResult(
+ host,
+ task,
+ dict(failed=True, exception=to_text(traceback.format_exc()), stdout=''),
+ ))
+ except:
+ display.debug(u"WORKER EXCEPTION: %s" % to_text(e))
+ display.debug(u"WORKER TRACEBACK: %s" % to_text(traceback.format_exc()))
# pr.disable()
# s = StringIO.StringIO()
@@ -103,3 +112,5 @@ def run_worker(task_vars, host, task, play_context, loader, variable_manager, sh
# ps.print_stats()
# with open('worker_%06d.stats' % os.getpid(), 'w') as f:
# f.write(s.getvalue())
+
+ display.debug("WORKER PROCESS EXITING")
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index cd84a55a45..1f4a94ec00 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -299,14 +299,13 @@ class TaskExecutor:
templar = Templar(loader=self._loader, shared_loader_obj=self._shared_loader_obj, variables=self._job_vars)
res['_ansible_item_label'] = templar.template(label)
- self._rslt_q.put(
+ self._rslt_q.put_result(
TaskResult(
- self._host.name,
- self._task._uuid,
+ self._host,
+ self._task,
res,
task_fields=self._task.dump_attrs(),
),
- block=False,
)
results.append(res)
del task_vars[loop_var]
@@ -591,7 +590,7 @@ class TaskExecutor:
result['_ansible_retry'] = True
result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
- self._rslt_q.put(TaskResult(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs()), block=False)
+ self._rslt_q.put_result(TaskResult(self._host, self._task, result, task_fields=self._task.dump_attrs()))
time.sleep(delay)
else:
if retries > 1:
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index d2cade56cf..03cf96f2c9 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -23,17 +23,19 @@ import threading
import os
import tempfile
+from collections import deque
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.executor.play_iterator import PlayIterator
+from ansible.executor.process.threading import run_worker
from ansible.executor.stats import AggregateStats
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_text
from ansible.playbook.block import Block
from ansible.playbook.play_context import PlayContext
-from ansible.plugins.loader import callback_loader, strategy_loader, module_loader
+from ansible.plugins.loader import action_loader, connection_loader, filter_loader, test_loader, lookup_loader, callback_loader, strategy_loader, module_loader
from ansible.plugins.callback import CallbackBase
from ansible.template import Templar
from ansible.utils.helpers import pct_to_int
@@ -50,6 +52,23 @@ except ImportError:
__all__ = ['TaskQueueManager']
+# TODO: this should probably be in the plugins/__init__.py, with
+# a smarter mechanism to set all of the attributes based on
+# the loaders created there
+class SharedPluginLoaderObj:
+ '''
+ A simple object to make pass the various plugin loaders to
+ the forked processes over the queue easier
+ '''
+ def __init__(self):
+ self.action_loader = action_loader
+ self.connection_loader = connection_loader
+ self.filter_loader = filter_loader
+ self.test_loader = test_loader
+ self.lookup_loader = lookup_loader
+ self.module_loader = module_loader
+
+
class TaskQueueManager:
'''
@@ -107,6 +126,39 @@ class TaskQueueManager:
self._connection_lockfile = tempfile.TemporaryFile()
self._executor = None
+ self._job_queue = deque()
+ self._job_queue_lock = threading.Lock()
+
+ self._res_queue = deque()
+ self._res_queue_lock = threading.Lock()
+
+ def _put_in_queue(self, data, queue, lock):
+ lock.acquire()
+ queue.appendleft(data)
+ lock.release()
+
+ def _pop_off_queue(self, queue, lock):
+ try:
+ data = None
+ lock.acquire()
+ data = queue.pop()
+ except:
+ pass
+ finally:
+ lock.release()
+ return data
+
+ def put_job(self, data):
+ self._put_in_queue(data, self._job_queue, self._job_queue_lock)
+
+ def get_job(self):
+ return self._pop_off_queue(self._job_queue, self._job_queue_lock)
+
+ def put_result(self, data):
+ self._put_in_queue(data, self._res_queue, self._res_queue_lock)
+
+ def get_result(self):
+ return self._pop_off_queue(self._res_queue, self._res_queue_lock)
def _initialize_processes(self, num):
# FIXME: be safe about creating this
@@ -114,9 +166,18 @@ class TaskQueueManager:
# FIXME: do we need a global lock for workers here instead of a per-worker?
self._workers = []
+ # create a dummy object with plugin loaders set as an easier
+ # way to share them with the forked processes
+ shared_loader_obj = SharedPluginLoaderObj()
+
for i in range(num):
+ w_thread = self._executor.submit(
+ run_worker,
+ self,
+ shared_loader_obj
+ )
w_lock = threading.Lock()
- self._workers.append([None, w_lock])
+ self._workers.append([w_thread, w_lock])
def _initialize_notified_handlers(self, play):
'''
@@ -322,7 +383,7 @@ class TaskQueueManager:
def _cleanup_processes(self):
if hasattr(self, '_workers'):
for (w_thread, w_lock) in self._workers:
- if w_thread and w_thread.is_running():
+ if w_thread and not w_thread.running():
w_thread.cancel()
def clear_failed_hosts(self):
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index af191cc892..b111e5f4ea 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -29,8 +29,6 @@ from jinja2.exceptions import UndefinedError
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
-#from ansible.executor.process.worker import WorkerProcess
-from ansible.executor.process.threading import run_worker
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
from ansible.module_utils.six.moves import queue as Queue
@@ -54,47 +52,6 @@ except ImportError:
__all__ = ['StrategyBase']
-# TODO: this should probably be in the plugins/__init__.py, with
-# a smarter mechanism to set all of the attributes based on
-# the loaders created there
-class SharedPluginLoaderObj:
- '''
- A simple object to make pass the various plugin loaders to
- the forked processes over the queue easier
- '''
- def __init__(self):
- self.action_loader = action_loader
- self.connection_loader = connection_loader
- self.filter_loader = filter_loader
- self.test_loader = test_loader
- self.lookup_loader = lookup_loader
- self.module_loader = module_loader
-
-def results_thread_main(strategy):
- while not strategy._tqm._terminated:
- try:
- did_work = False
- for idx, slot in enumerate(strategy._tqm._workers):
- (w_thread, w_lock) = slot
- try:
- w_lock.acquire()
- if w_thread and w_thread.done():
- result = w_thread.result()
- try:
- strategy._results_lock.acquire()
- strategy._results.append(result)
- finally:
- strategy._results_lock.release()
- strategy._tqm._workers[idx] = [None, w_lock]
- did_work = True
- finally:
- w_lock.release()
- if not did_work:
- time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
- except Exception as e:
- pass
- print("RESULTS THREAD EXITED!!!")
-
class StrategyBase:
@@ -125,17 +82,8 @@ class StrategyBase:
# outstanding tasks still in queue
self._blocked_hosts = dict()
- self._results = deque()
- self._results_lock = threading.Condition(threading.Lock())
-
- # create the result processing thread for reading results in the background
- self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
- self._results_thread.daemon = True
- self._results_thread.start()
-
def cleanup(self):
self._tqm.terminate()
- self._results_thread.join()
def run(self, iterator, play_context, result=0):
# execute one more pass through the iterator without peeking, to
@@ -206,43 +154,9 @@ class StrategyBase:
display.debug('Creating lock for %s' % task.action)
action_write_locks.action_write_locks[task.action] = threading.Lock()
- # and then queue the new task
- try:
- # create a dummy object with plugin loaders set as an easier
- # way to share them with the forked processes
- shared_loader_obj = SharedPluginLoaderObj()
-
- queued = False
- starting_worker = self._cur_worker
- while True:
- (w_thread, w_lock) = self._workers[self._cur_worker]
- if w_thread is None:
- w_thread = self._tqm._executor.submit(
- run_worker,
- task_vars,
- host,
- task,
- play_context,
- self._loader,
- self._variable_manager,
- shared_loader_obj
- )
- self._workers[self._cur_worker][0] = w_thread
- display.debug("worker is %d (out of %d available)" % (self._cur_worker+1, len(self._workers)))
- queued = True
- self._cur_worker += 1
- if self._cur_worker >= len(self._workers):
- self._cur_worker = 0
- if queued:
- break
- elif self._cur_worker == starting_worker:
- time.sleep(0.0001)
-
- self._pending_results += 1
- except (EOFError, IOError, AssertionError) as e:
- # most likely an abort
- display.debug("got an error while queuing: %s" % e)
- return
+ self._tqm.put_job((host, task, play_context, task_vars))
+ self._pending_results += 1
+
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def get_task_hosts(self, iterator, task_host, task):
@@ -333,23 +247,21 @@ class StrategyBase:
cur_pass = 0
while True:
- try:
- self._results_lock.acquire()
- task_result = self._results.popleft()
- except IndexError:
+ task_result = self._tqm.get_result()
+ if task_result is None:
break
- finally:
- self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
- original_host = get_original_host(task_result._host)
- found_task = iterator.get_original_task(original_host, task_result._task)
- original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
- original_task._parent = found_task._parent
- original_task.from_attrs(task_result._task_fields)
-
- task_result._host = original_host
- task_result._task = original_task
+ #original_host = get_original_host(task_result._host)
+ #found_task = iterator.get_original_task(original_host, task_result._task)
+ #original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
+ #original_task._parent = found_task._parent
+ #original_task.from_attrs(task_result._task_fields)
+
+ #task_result._host = original_host
+ #task_result._task = original_task
+ original_host = task_result._host
+ original_task = task_result._task
# get the correct loop var for use later
if original_task.loop_control: