summaryrefslogtreecommitdiff
path: root/lib/ansible/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/plugins')
-rw-r--r--lib/ansible/plugins/loader.py16
-rw-r--r--lib/ansible/plugins/strategy/__init__.py100
2 files changed, 21 insertions, 95 deletions
diff --git a/lib/ansible/plugins/loader.py b/lib/ansible/plugins/loader.py
index 1e1dc84a6c..0ca284bd64 100644
--- a/lib/ansible/plugins/loader.py
+++ b/lib/ansible/plugins/loader.py
@@ -15,6 +15,7 @@ import sys
import warnings
from collections import defaultdict
+from threading import Lock
from ansible import constants as C
from ansible.plugins import get_plugin_class, MODULE_CACHE, PATH_CACHE, PLUGIN_PATH_CACHE
@@ -73,6 +74,7 @@ class PluginLoader:
self._extra_dirs = []
self._searched_paths = set()
+ self._lock = Lock()
def __setstate__(self, data):
'''
@@ -360,11 +362,14 @@ class PluginLoader:
if path is None:
return None
+ self._lock.acquire()
if path not in self._module_cache:
self._module_cache[path] = self._load_module_source(name, path)
found_in_cache = False
obj = getattr(self._module_cache[path], self.class_name)
+ self._lock.release()
+
if self.base_class:
# The import path is hardcoded and should be the right place,
# so we are not expecting an ImportError.
@@ -423,15 +428,18 @@ class PluginLoader:
yield path
continue
- if path not in self._module_cache:
- self._module_cache[path] = self._load_module_source(name, path)
- found_in_cache = False
-
try:
+ self._lock.acquire()
+ if path not in self._module_cache:
+ self._module_cache[path] = self._load_module_source(name, path)
+ found_in_cache = False
+
obj = getattr(self._module_cache[path], self.class_name)
except AttributeError as e:
display.warning("Skipping plugin (%s) as it seems to be invalid: %s" % (path, to_text(e)))
continue
+ finally:
+ self._lock.release()
if self.base_class:
# The import path is hardcoded and should be the right place,
diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py
index 6937e6f723..755c7e4354 100644
--- a/lib/ansible/plugins/strategy/__init__.py
+++ b/lib/ansible/plugins/strategy/__init__.py
@@ -30,17 +30,15 @@ from jinja2.exceptions import UndefinedError
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable
from ansible.executor import action_write_locks
-from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.inventory.host import Host
-from ansible.module_utils.six.moves import queue as Queue
from ansible.module_utils.six import iteritems, string_types
from ansible.module_utils._text import to_text
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
-from ansible.plugins.loader import action_loader, connection_loader, filter_loader, lookup_loader, module_loader, test_loader
+from ansible.plugins.loader import action_loader, connection_loader
from ansible.template import Templar
from ansible.utils.vars import combine_vars
from ansible.vars.manager import strip_internal_keys
@@ -55,50 +53,11 @@ except ImportError:
__all__ = ['StrategyBase']
-class StrategySentinel:
- pass
-
-
-# TODO: this should probably be in the plugins/__init__.py, with
-# a smarter mechanism to set all of the attributes based on
-# the loaders created there
-class SharedPluginLoaderObj:
- '''
- A simple object to make pass the various plugin loaders to
- the forked processes over the queue easier
- '''
- def __init__(self):
- self.action_loader = action_loader
- self.connection_loader = connection_loader
- self.filter_loader = filter_loader
- self.test_loader = test_loader
- self.lookup_loader = lookup_loader
- self.module_loader = module_loader
-
-_sentinel = StrategySentinel()
-
-
-def results_thread_main(strategy):
- while True:
- try:
- result = strategy._final_q.get()
- if isinstance(result, StrategySentinel):
- break
- else:
- strategy._results_lock.acquire()
- strategy._results.append(result)
- strategy._results_lock.release()
- except (IOError, EOFError):
- break
- except Queue.Empty:
- pass
-
-
class StrategyBase:
'''
This is the base class for strategy plugins, which contains some common
- code useful to all strategies like running handlers, cleanup actions, etc.
+ code useful to all strategies like running handlers, etc.
'''
def __init__(self, tqm):
@@ -109,7 +68,6 @@ class StrategyBase:
self._listening_handlers = tqm._listening_handlers
self._variable_manager = tqm.get_variable_manager()
self._loader = tqm.get_loader()
- self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False)
@@ -118,24 +76,13 @@ class StrategyBase:
# internal counters
self._pending_results = 0
- self._cur_worker = 0
# this dictionary is used to keep track of hosts that have
# outstanding tasks still in queue
self._blocked_hosts = dict()
- self._results = deque()
self._results_lock = threading.Condition(threading.Lock())
- # create the result processing thread for reading results in the background
- self._results_thread = threading.Thread(target=results_thread_main, args=(self,))
- self._results_thread.daemon = True
- self._results_thread.start()
-
- def cleanup(self):
- self._final_q.put(_sentinel)
- self._results_thread.join()
-
def run(self, iterator, play_context, result=0):
# execute one more pass through the iterator without peeking, to
# make sure that all of the hosts are advanced to their final task.
@@ -203,38 +150,13 @@ class StrategyBase:
if task.action not in action_write_locks.action_write_locks:
display.debug('Creating lock for %s' % task.action)
- action_write_locks.action_write_locks[task.action] = Lock()
-
- # and then queue the new task
- try:
-
- # create a dummy object with plugin loaders set as an easier
- # way to share them with the forked processes
- shared_loader_obj = SharedPluginLoaderObj()
-
- queued = False
- starting_worker = self._cur_worker
- while True:
- (worker_prc, rslt_q) = self._workers[self._cur_worker]
- if worker_prc is None or not worker_prc.is_alive():
- worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
- self._workers[self._cur_worker][0] = worker_prc
- worker_prc.start()
- display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers)))
- queued = True
- self._cur_worker += 1
- if self._cur_worker >= len(self._workers):
- self._cur_worker = 0
- if queued:
- break
- elif self._cur_worker == starting_worker:
- time.sleep(0.0001)
+ action_write_locks.action_write_locks[task.action] = threading.Lock()
+ if self._tqm.put_job((host, task, play_context, task_vars)):
self._pending_results += 1
- except (EOFError, IOError, AssertionError) as e:
- # most likely an abort
- display.debug("got an error while queuing: %s" % e)
- return
+ else:
+ raise AnsibleError('Could not put the job')
+
display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action))
def get_task_hosts(self, iterator, task_host, task):
@@ -325,13 +247,9 @@ class StrategyBase:
cur_pass = 0
while True:
- try:
- self._results_lock.acquire()
- task_result = self._results.popleft()
- except IndexError:
+ task_result = self._tqm.get_result()
+ if task_result is None:
break
- finally:
- self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host)