# (c) 2012-2014, Michael DeHaan # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import (absolute_import, division, print_function) __metaclass__ = type import cmd import functools import os import pprint import queue import sys import threading import time import typing as t from collections import deque from multiprocessing import Lock from jinja2.exceptions import UndefinedError from ansible import constants as C from ansible import context from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleUndefinedVariable, AnsibleParserError from ansible.executor import action_write_locks from ansible.executor.play_iterator import IteratingStates, PlayIterator from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.executor.task_queue_manager import CallbackSend, DisplaySend, PromptSend from ansible.module_utils.six import string_types from ansible.module_utils.common.text.converters import to_text from ansible.module_utils.connection import Connection, ConnectionError from ansible.playbook.conditional import Conditional from ansible.playbook.handler import Handler from ansible.playbook.helpers import load_list_of_blocks from ansible.playbook.task import Task from ansible.playbook.task_include import TaskInclude from ansible.plugins import loader as plugin_loader from ansible.template import Templar from ansible.utils.display import Display from ansible.utils.fqcn import add_internal_fqcns from ansible.utils.multiprocessing import context as multiprocessing_context from ansible.utils.unsafe_proxy import wrap_var from ansible.utils.vars import combine_vars, isidentifier from ansible.vars.clean import strip_internal_keys, module_response_deepcopy display = Display() __all__ = ['StrategyBase'] # This list can be an exact match, or start of string bound # does not accept regex ALWAYS_DELEGATE_FACT_PREFIXES = frozenset(( 'discovered_interpreter_', )) class StrategySentinel: pass _sentinel = StrategySentinel() def post_process_whens(result, task, templar, task_vars): cond = None if task.changed_when: with templar.set_temporary_context(available_variables=task_vars): cond = Conditional(loader=templar._loader) cond.when = task.changed_when result['changed'] = cond.evaluate_conditional(templar, templar.available_variables) if task.failed_when: with templar.set_temporary_context(available_variables=task_vars): if cond is None: cond = Conditional(loader=templar._loader) cond.when = task.failed_when failed_when_result = cond.evaluate_conditional(templar, templar.available_variables) result['failed_when_result'] = result['failed'] = failed_when_result def _get_item_vars(result, task): item_vars = {} if task.loop or task.loop_with: loop_var = result.get('ansible_loop_var', 'item') index_var = result.get('ansible_index_var') if loop_var in result: item_vars[loop_var] = result[loop_var] if index_var and index_var in result: item_vars[index_var] = result[index_var] if '_ansible_item_label' in result: item_vars['_ansible_item_label'] = result['_ansible_item_label'] if 'ansible_loop' in result: item_vars['ansible_loop'] = result['ansible_loop'] return item_vars def results_thread_main(strategy): while True: try: result = strategy._final_q.get() if isinstance(result, StrategySentinel): break elif isinstance(result, DisplaySend): display.display(*result.args, **result.kwargs) elif isinstance(result, CallbackSend): for arg in result.args: if isinstance(arg, TaskResult): strategy.normalize_task_result(arg) break strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs) elif isinstance(result, TaskResult): strategy.normalize_task_result(result) with strategy._results_lock: strategy._results.append(result) elif isinstance(result, PromptSend): try: value = display.prompt_until( result.prompt, private=result.private, seconds=result.seconds, complete_input=result.complete_input, interrupt_input=result.interrupt_input, ) except AnsibleError as e: value = e except BaseException as e: # relay unexpected errors so bugs in display are reported and don't cause workers to hang try: raise AnsibleError(f"{e}") from e except AnsibleError as e: value = e strategy._workers[result.worker_id].worker_queue.put(value) else: display.warning('Received an invalid object (%s) in the result queue: %r' % (type(result), result)) except (IOError, EOFError): break except queue.Empty: pass def debug_closure(func): """Closure to wrap ``StrategyBase._process_pending_results`` and invoke the task debugger""" @functools.wraps(func) def inner(self, iterator, one_pass=False, max_passes=None): status_to_stats_map = ( ('is_failed', 'failures'), ('is_unreachable', 'dark'), ('is_changed', 'changed'), ('is_skipped', 'skipped'), ) # We don't know the host yet, copy the previous states, for lookup after we process new results prev_host_states = iterator.host_states.copy() results = func(self, iterator, one_pass=one_pass, max_passes=max_passes) _processed_results = [] for result in results: task = result._task host = result._host _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None) task_vars = _queued_task_args['task_vars'] play_context = _queued_task_args['play_context'] # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state try: prev_host_state = prev_host_states[host.name] except KeyError: prev_host_state = iterator.get_host_state(host) while result.needs_debugger(globally_enabled=self.debugger_active): next_action = NextAction() dbg = Debugger(task, host, task_vars, play_context, result, next_action) dbg.cmdloop() if next_action.result == NextAction.REDO: # rollback host state self._tqm.clear_failed_hosts() if task.run_once and iterator._play.strategy in add_internal_fqcns(('linear',)) and result.is_failed(): for host_name, state in prev_host_states.items(): if host_name == host.name: continue iterator.set_state_for_host(host_name, state) iterator._play._removed_hosts.remove(host_name) iterator.set_state_for_host(host.name, prev_host_state) for method, what in status_to_stats_map: if getattr(result, method)(): self._tqm._stats.decrement(what, host.name) self._tqm._stats.decrement('ok', host.name) # redo self._queue_task(host, task, task_vars, play_context) _processed_results.extend(debug_closure(func)(self, iterator, one_pass)) break elif next_action.result == NextAction.CONTINUE: _processed_results.append(result) break elif next_action.result == NextAction.EXIT: # Matches KeyboardInterrupt from bin/ansible sys.exit(99) else: _processed_results.append(result) return _processed_results return inner class StrategyBase: ''' This is the base class for strategy plugins, which contains some common code useful to all strategies like running handlers, cleanup actions, etc. ''' # by default, strategies should support throttling but we allow individual # strategies to disable this and either forego supporting it or managing # the throttling internally (as `free` does) ALLOW_BASE_THROTTLING = True def __init__(self, tqm): self._tqm = tqm self._inventory = tqm.get_inventory() self._workers = tqm._workers self._variable_manager = tqm.get_variable_manager() self._loader = tqm.get_loader() self._final_q = tqm._final_q self._step = context.CLIARGS.get('step', False) self._diff = context.CLIARGS.get('diff', False) # the task cache is a dictionary of tuples of (host.name, task._uuid) # used to find the original task object of in-flight tasks and to store # the task args/vars and play context info used to queue the task. self._queued_task_cache = {} # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display # internal counters self._pending_results = 0 self._cur_worker = 0 # this dictionary is used to keep track of hosts that have # outstanding tasks still in queue self._blocked_hosts = dict() self._results = deque() self._results_lock = threading.Condition(threading.Lock()) self._worker_queues = dict() # create the result processing thread for reading results in the background self._results_thread = threading.Thread(target=results_thread_main, args=(self,)) self._results_thread.daemon = True self._results_thread.start() # holds the list of active (persistent) connections to be shutdown at # play completion self._active_connections = dict() # Caches for get_host calls, to avoid calling excessively # These values should be set at the top of the ``run`` method of each # strategy plugin. Use ``_set_hosts_cache`` to set these values self._hosts_cache = [] self._hosts_cache_all = [] self.debugger_active = C.ENABLE_TASK_DEBUGGER def _set_hosts_cache(self, play, refresh=True): """Responsible for setting _hosts_cache and _hosts_cache_all See comment in ``__init__`` for the purpose of these caches """ if not refresh and all((self._hosts_cache, self._hosts_cache_all)): return if not play.finalized and Templar(None).is_template(play.hosts): _pattern = 'all' else: _pattern = play.hosts or 'all' self._hosts_cache_all = [h.name for h in self._inventory.get_hosts(pattern=_pattern, ignore_restrictions=True)] self._hosts_cache = [h.name for h in self._inventory.get_hosts(play.hosts, order=play.order)] def cleanup(self): # close active persistent connections for sock in self._active_connections.values(): try: conn = Connection(sock) conn.reset() except ConnectionError as e: # most likely socket is already closed display.debug("got an error while closing persistent connection: %s" % e) self._final_q.put(_sentinel) self._results_thread.join() def run(self, iterator, play_context, result=0): # execute one more pass through the iterator without peeking, to # make sure that all of the hosts are advanced to their final task. # This should be safe, as everything should be IteratingStates.COMPLETE by # this point, though the strategy may not advance the hosts itself. for host in self._hosts_cache: if host not in self._tqm._unreachable_hosts: try: iterator.get_next_task_for_host(self._inventory.hosts[host]) except KeyError: iterator.get_next_task_for_host(self._inventory.get_host(host)) # return the appropriate code, depending on the status hosts after the run if not isinstance(result, bool) and result != self._tqm.RUN_OK: return result elif len(self._tqm._unreachable_hosts.keys()) > 0: return self._tqm.RUN_UNREACHABLE_HOSTS elif len(iterator.get_failed_hosts()) > 0: return self._tqm.RUN_FAILED_HOSTS else: return self._tqm.RUN_OK def get_hosts_remaining(self, play): self._set_hosts_cache(play, refresh=False) ignore = set(self._tqm._failed_hosts).union(self._tqm._unreachable_hosts) return [host for host in self._hosts_cache if host not in ignore] def get_failed_hosts(self, play): self._set_hosts_cache(play, refresh=False) return [host for host in self._hosts_cache if host in self._tqm._failed_hosts] def add_tqm_variables(self, vars, play): ''' Base class method to add extra variables/information to the list of task vars sent through the executor engine regarding the task queue manager state. ''' vars['ansible_current_hosts'] = self.get_hosts_remaining(play) vars['ansible_failed_hosts'] = self.get_failed_hosts(play) def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) # Add a write lock for tasks. # Maybe this should be added somewhere further up the call stack but # this is the earliest in the code where we have task (1) extracted # into its own variable and (2) there's only a single code path # leading to the module being run. This is called by two # functions: linear.py::run(), and # free.py::run() so we'd have to add to both to do it there. # The next common higher level is __init__.py::run() and that has # tasks inside of play_iterator so we'd have to extract them to do it # there. if task.action not in action_write_locks.action_write_locks: display.debug('Creating lock for %s' % task.action) action_write_locks.action_write_locks[task.action] = Lock() # create a templar and template things we need later for the queuing process templar = Templar(loader=self._loader, variables=task_vars) try: throttle = int(templar.template(task.throttle)) except Exception as e: raise AnsibleError("Failed to convert the throttle value to an integer.", obj=task._ds, orig_exc=e) # and then queue the new task try: # Determine the "rewind point" of the worker list. This means we start # iterating over the list of workers until the end of the list is found. # Normally, that is simply the length of the workers list (as determined # by the forks or serial setting), however a task/block/play may "throttle" # that limit down. rewind_point = len(self._workers) if throttle > 0 and self.ALLOW_BASE_THROTTLING: if task.run_once: display.debug("Ignoring 'throttle' as 'run_once' is also set for '%s'" % task.get_name()) else: if throttle <= rewind_point: display.debug("task: %s, throttle: %d" % (task.get_name(), throttle)) rewind_point = throttle queued = False starting_worker = self._cur_worker while True: if self._cur_worker >= rewind_point: self._cur_worker = 0 worker_prc = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): self._queued_task_cache[(host.name, task._uuid)] = { 'host': host, 'task': task, 'task_vars': task_vars, 'play_context': play_context } # Pass WorkerProcess its strategy worker number so it can send an identifier along with intra-task requests worker_prc = WorkerProcess( self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, plugin_loader, self._cur_worker, ) self._workers[self._cur_worker] = worker_prc self._tqm.send_callback('v2_runner_on_start', host, task) worker_prc.start() display.debug("worker is %d (out of %d available)" % (self._cur_worker + 1, len(self._workers))) queued = True self._cur_worker += 1 if self._cur_worker >= rewind_point: self._cur_worker = 0 if queued: break elif self._cur_worker == starting_worker: time.sleep(0.0001) self._pending_results += 1 except (EOFError, IOError, AssertionError) as e: # most likely an abort display.debug("got an error while queuing: %s" % e) return display.debug("exiting _queue_task() for %s/%s" % (host.name, task.action)) def get_task_hosts(self, iterator, task_host, task): if task.run_once: host_list = [host for host in self._hosts_cache if host not in self._tqm._unreachable_hosts] else: host_list = [task_host.name] return host_list def get_delegated_hosts(self, result, task): host_name = result.get('_ansible_delegated_vars', {}).get('ansible_delegated_host', None) return [host_name or task.delegate_to] def _set_always_delegated_facts(self, result, task): """Sets host facts for ``delegate_to`` hosts for facts that should always be delegated This operation mutates ``result`` to remove the always delegated facts See ``ALWAYS_DELEGATE_FACT_PREFIXES`` """ if task.delegate_to is None: return facts = result['ansible_facts'] always_keys = set() _add = always_keys.add for fact_key in facts: for always_key in ALWAYS_DELEGATE_FACT_PREFIXES: if fact_key.startswith(always_key): _add(fact_key) if always_keys: _pop = facts.pop always_facts = { 'ansible_facts': dict((k, _pop(k)) for k in list(facts) if k in always_keys) } host_list = self.get_delegated_hosts(result, task) _set_host_facts = self._variable_manager.set_host_facts for target_host in host_list: _set_host_facts(target_host, always_facts) def normalize_task_result(self, task_result): """Normalize a TaskResult to reference actual Host and Task objects when only given the ``Host.name``, or the ``Task._uuid`` Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns Mutates the original object """ if isinstance(task_result._host, string_types): # If the value is a string, it is ``Host.name`` task_result._host = self._inventory.get_host(to_text(task_result._host)) if isinstance(task_result._task, string_types): # If the value is a string, it is ``Task._uuid`` queue_cache_entry = (task_result._host.name, task_result._task) try: found_task = self._queued_task_cache[queue_cache_entry]['task'] except KeyError: # This should only happen due to an implicit task created by the # TaskExecutor, restrict this behavior to the explicit use case # of an implicit async_status task if task_result._task_fields.get('action') != 'async_status': raise original_task = Task() else: original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) original_task._parent = found_task._parent original_task.from_attrs(task_result._task_fields) task_result._task = original_task return task_result def search_handlers_by_notification(self, notification: str, iterator: PlayIterator) -> t.Generator[Handler, None, None]: templar = Templar(None) # iterate in reversed order since last handler loaded with the same name wins for handler in (h for b in reversed(iterator._play.handlers) for h in b.block if h.name): if not handler.cached_name: if templar.is_template(handler.name): templar.available_variables = self._variable_manager.get_vars( play=iterator._play, task=handler, _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all ) try: handler.name = templar.template(handler.name) except (UndefinedError, AnsibleUndefinedVariable) as e: # We skip this handler due to the fact that it may be using # a variable in the name that was conditionally included via # set_fact or some other method, and we don't want to error # out unnecessarily if not handler.listen: display.warning( "Handler '%s' is unusable because it has no listen topics and " "the name could not be templated (host-specific variables are " "not supported in handler names). The error: %s" % (handler.name, to_text(e)) ) continue handler.cached_name = True # first we check with the full result of get_name(), which may # include the role name (if the handler is from a role). If that # is not found, we resort to the simple name field, which doesn't # have anything extra added to it. if notification in { handler.name, handler.get_name(include_role_fqcn=False), handler.get_name(include_role_fqcn=True), }: yield handler break templar.available_variables = {} for handler in (h for b in iterator._play.handlers for h in b.block): if listeners := handler.listen: if notification in handler.get_validated_value( 'listen', handler.fattributes.get('listen'), listeners, templar, ): yield handler @debug_closure def _process_pending_results(self, iterator, one_pass=False, max_passes=None): ''' Reads results off the final queue and takes appropriate action based on the result (executing callbacks, updating state, etc.). ''' ret_results = [] handler_templar = Templar(self._loader) cur_pass = 0 while True: try: self._results_lock.acquire() task_result = self._results.popleft() except IndexError: break finally: self._results_lock.release() original_host = task_result._host original_task = task_result._task # all host status messages contain 2 entries: (msg, task_result) role_ran = False if task_result.is_failed(): role_ran = True ignore_errors = original_task.ignore_errors if not ignore_errors: # save the current state before failing it for later inspection state_when_failed = iterator.get_state_for_host(original_host.name) display.debug("marking %s as failed" % original_host.name) if original_task.run_once: # if we're using run_once, we have to fail every host here for h in self._inventory.get_hosts(iterator._play.hosts): if h.name not in self._tqm._unreachable_hosts: iterator.mark_host_failed(h) else: iterator.mark_host_failed(original_host) state, _ = iterator.get_next_task_for_host(original_host, peek=True) if iterator.is_failed(original_host) and state and state.run_state == IteratingStates.COMPLETE: self._tqm._failed_hosts[original_host.name] = True # if we're iterating on the rescue portion of a block then # we save the failed task in a special var for use # within the rescue/always if iterator.is_any_block_rescuing(state_when_failed): self._tqm._stats.increment('rescued', original_host.name) iterator._play._removed_hosts.remove(original_host.name) self._variable_manager.set_nonpersistent_facts( original_host.name, dict( ansible_failed_task=wrap_var(original_task.serialize()), ansible_failed_result=task_result._result, ), ) else: self._tqm._stats.increment('failures', original_host.name) else: self._tqm._stats.increment('ok', original_host.name) self._tqm._stats.increment('ignored', original_host.name) if 'changed' in task_result._result and task_result._result['changed']: self._tqm._stats.increment('changed', original_host.name) self._tqm.send_callback('v2_runner_on_failed', task_result, ignore_errors=ignore_errors) elif task_result.is_unreachable(): ignore_unreachable = original_task.ignore_unreachable if not ignore_unreachable: self._tqm._unreachable_hosts[original_host.name] = True iterator._play._removed_hosts.append(original_host.name) self._tqm._stats.increment('dark', original_host.name) else: self._tqm._stats.increment('ok', original_host.name) self._tqm._stats.increment('ignored', original_host.name) self._tqm.send_callback('v2_runner_on_unreachable', task_result) elif task_result.is_skipped(): self._tqm._stats.increment('skipped', original_host.name) self._tqm.send_callback('v2_runner_on_skipped', task_result) else: role_ran = True if original_task.loop: # this task had a loop, and has more than one result, so # loop over all of them instead of a single result result_items = task_result._result.get('results', []) else: result_items = [task_result._result] for result_item in result_items: if '_ansible_notify' in result_item and task_result.is_changed(): # only ensure that notified handlers exist, if so save the notifications for when # handlers are actually flushed so the last defined handlers are exexcuted, # otherwise depending on the setting either error or warn for notification in result_item['_ansible_notify']: if any(self.search_handlers_by_notification(notification, iterator)): iterator.add_notification(original_host.name, notification) display.vv(f"Notification for handler {notification} has been saved.") continue msg = ( f"The requested handler '{notification}' was not found in either the main handlers" " list nor in the listening handlers list" ) if C.ERROR_ON_MISSING_HANDLER: raise AnsibleError(msg) else: display.warning(msg) if 'add_host' in result_item: # this task added a new host (add_host module) new_host_info = result_item.get('add_host', dict()) self._inventory.add_dynamic_host(new_host_info, result_item) # ensure host is available for subsequent plays if result_item.get('changed') and new_host_info['host_name'] not in self._hosts_cache_all: self._hosts_cache_all.append(new_host_info['host_name']) elif 'add_group' in result_item: # this task added a new group (group_by module) self._inventory.add_dynamic_group(original_host, result_item) if 'add_host' in result_item or 'add_group' in result_item: item_vars = _get_item_vars(result_item, original_task) found_task_vars = self._queued_task_cache.get((original_host.name, task_result._task._uuid))['task_vars'] if item_vars: all_task_vars = combine_vars(found_task_vars, item_vars) else: all_task_vars = found_task_vars all_task_vars[original_task.register] = wrap_var(result_item) post_process_whens(result_item, original_task, handler_templar, all_task_vars) if original_task.loop or original_task.loop_with: new_item_result = TaskResult( task_result._host, task_result._task, result_item, task_result._task_fields, ) self._tqm.send_callback('v2_runner_item_on_ok', new_item_result) if result_item.get('changed', False): task_result._result['changed'] = True if result_item.get('failed', False): task_result._result['failed'] = True if 'ansible_facts' in result_item and original_task.action not in C._ACTION_DEBUG: # if delegated fact and we are delegating facts, we need to change target host for them if original_task.delegate_to is not None and original_task.delegate_facts: host_list = self.get_delegated_hosts(result_item, original_task) else: # Set facts that should always be on the delegated hosts self._set_always_delegated_facts(result_item, original_task) host_list = self.get_task_hosts(iterator, original_host, original_task) if original_task.action in C._ACTION_INCLUDE_VARS: for (var_name, var_value) in result_item['ansible_facts'].items(): # find the host we're actually referring too here, which may # be a host that is not really in inventory at all for target_host in host_list: self._variable_manager.set_host_variable(target_host, var_name, var_value) else: cacheable = result_item.pop('_ansible_facts_cacheable', False) for target_host in host_list: # so set_fact is a misnomer but 'cacheable = true' was meant to create an 'actual fact' # to avoid issues with precedence and confusion with set_fact normal operation, # we set BOTH fact and nonpersistent_facts (aka hostvar) # when fact is retrieved from cache in subsequent operations it will have the lower precedence, # but for playbook setting it the 'higher' precedence is kept is_set_fact = original_task.action in C._ACTION_SET_FACT if not is_set_fact or cacheable: self._variable_manager.set_host_facts(target_host, result_item['ansible_facts'].copy()) if is_set_fact: self._variable_manager.set_nonpersistent_facts(target_host, result_item['ansible_facts'].copy()) if 'ansible_stats' in result_item and 'data' in result_item['ansible_stats'] and result_item['ansible_stats']['data']: if 'per_host' not in result_item['ansible_stats'] or result_item['ansible_stats']['per_host']: host_list = self.get_task_hosts(iterator, original_host, original_task) else: host_list = [None] data = result_item['ansible_stats']['data'] aggregate = 'aggregate' in result_item['ansible_stats'] and result_item['ansible_stats']['aggregate'] for myhost in host_list: for k in data.keys(): if aggregate: self._tqm._stats.update_custom_stats(k, data[k], myhost) else: self._tqm._stats.set_custom_stats(k, data[k], myhost) if 'diff' in task_result._result: if self._diff or getattr(original_task, 'diff', False): self._tqm.send_callback('v2_on_file_diff', task_result) if not isinstance(original_task, TaskInclude): self._tqm._stats.increment('ok', original_host.name) if 'changed' in task_result._result and task_result._result['changed']: self._tqm._stats.increment('changed', original_host.name) # finally, send the ok for this task self._tqm.send_callback('v2_runner_on_ok', task_result) # register final results if original_task.register: if not isidentifier(original_task.register): raise AnsibleError("Invalid variable name in 'register' specified: '%s'" % original_task.register) host_list = self.get_task_hosts(iterator, original_host, original_task) clean_copy = strip_internal_keys(module_response_deepcopy(task_result._result)) if 'invocation' in clean_copy: del clean_copy['invocation'] for target_host in host_list: self._variable_manager.set_nonpersistent_facts(target_host, {original_task.register: clean_copy}) self._pending_results -= 1 if original_host.name in self._blocked_hosts: del self._blocked_hosts[original_host.name] # If this is a role task, mark the parent role as being run (if # the task was ok or failed, but not skipped or unreachable) if original_task._role is not None and role_ran: # TODO: and original_task.action not in C._ACTION_INCLUDE_ROLE:? # lookup the role in the role cache to make sure we're dealing # with the correct object and mark it as executed role_obj = self._get_cached_role(original_task, iterator._play) role_obj._had_task_run[original_host.name] = True ret_results.append(task_result) if isinstance(original_task, Handler): for handler in (h for b in iterator._play.handlers for h in b.block if h._uuid == original_task._uuid): handler.remove_host(original_host) if one_pass or max_passes is not None and (cur_pass + 1) >= max_passes: break cur_pass += 1 return ret_results def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep between checks to ensure we don't spin lock ''' ret_results = [] display.debug("waiting for pending results...") while self._pending_results > 0 and not self._tqm._terminated: if self._tqm.has_dead_workers(): raise AnsibleError("A worker was found in a dead state") results = self._process_pending_results(iterator) ret_results.extend(results) if self._pending_results > 0: time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) display.debug("no more pending results, returning what we have") return ret_results def _copy_included_file(self, included_file): ''' A proven safe and performant way to create a copy of an included file ''' ti_copy = included_file._task.copy(exclude_parent=True) ti_copy._parent = included_file._task._parent temp_vars = ti_copy.vars | included_file._vars ti_copy.vars = temp_vars return ti_copy def _load_included_file(self, included_file, iterator, is_handler=False): ''' Loads an included YAML file of tasks, applying the optional set of variables. Raises AnsibleError exception in case of a failure during including a file, in such case the caller is responsible for marking the host(s) as failed using PlayIterator.mark_host_failed(). ''' display.debug("loading included file: %s" % included_file._filename) try: data = self._loader.load_from_file(included_file._filename) if data is None: return [] elif not isinstance(data, list): raise AnsibleError("included task files must contain a list of tasks") ti_copy = self._copy_included_file(included_file) block_list = load_list_of_blocks( data, play=iterator._play, parent_block=ti_copy.build_parent_block(), role=included_file._task._role, use_handlers=is_handler, loader=self._loader, variable_manager=self._variable_manager, ) # since we skip incrementing the stats when the task result is # first processed, we do so now for each host in the list for host in included_file._hosts: self._tqm._stats.increment('ok', host.name) except AnsibleParserError: raise except AnsibleError as e: if isinstance(e, AnsibleFileNotFound): reason = "Could not find or access '%s' on the Ansible Controller." % to_text(e.file_name) else: reason = to_text(e) for r in included_file._results: r._result['failed'] = True for host in included_file._hosts: tr = TaskResult(host=host, task=included_file._task, return_data=dict(failed=True, reason=reason)) self._tqm._stats.increment('failures', host.name) self._tqm.send_callback('v2_runner_on_failed', tr) raise AnsibleError(reason) from e # finally, send the callback and return the list of blocks loaded self._tqm.send_callback('v2_playbook_on_include', included_file) display.debug("done processing included file") return block_list def _take_step(self, task, host=None): ret = False msg = u'Perform task: %s ' % task if host: msg += u'on %s ' % host msg += u'(N)o/(y)es/(c)ontinue: ' resp = display.prompt(msg) if resp.lower() in ['y', 'yes']: display.debug("User ran task") ret = True elif resp.lower() in ['c', 'continue']: display.debug("User ran task and canceled step mode") self._step = False ret = True else: display.debug("User skipped task") display.banner(msg) return ret def _cond_not_supported_warn(self, task_name): display.warning("%s task does not support when conditional" % task_name) def _execute_meta(self, task, play_context, iterator, target_host): # meta tasks store their args in the _raw_params field of args, # since they do not use k=v pairs, so get that meta_action = task.args.get('_raw_params') def _evaluate_conditional(h): all_vars = self._variable_manager.get_vars(play=iterator._play, host=h, task=task, _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) templar = Templar(loader=self._loader, variables=all_vars) return task.evaluate_conditional(templar, all_vars) skipped = False msg = meta_action skip_reason = '%s conditional evaluated to False' % meta_action if isinstance(task, Handler): self._tqm.send_callback('v2_playbook_on_handler_task_start', task) else: self._tqm.send_callback('v2_playbook_on_task_start', task, is_conditional=False) # These don't support "when" conditionals if meta_action in ('noop', 'refresh_inventory', 'reset_connection') and task.when: self._cond_not_supported_warn(meta_action) if meta_action == 'noop': msg = "noop" elif meta_action == 'flush_handlers': if _evaluate_conditional(target_host): host_state = iterator.get_state_for_host(target_host.name) # actually notify proper handlers based on all notifications up to this point for notification in list(host_state.handler_notifications): for handler in self.search_handlers_by_notification(notification, iterator): if handler.notify_host(target_host): # NOTE even with notifications deduplicated this can still happen in case of handlers being # notified multiple times using different names, like role name or fqcn self._tqm.send_callback('v2_playbook_on_notify', handler, target_host) iterator.clear_notification(target_host.name, notification) if host_state.run_state == IteratingStates.HANDLERS: raise AnsibleError('flush_handlers cannot be used as a handler') if target_host.name not in self._tqm._unreachable_hosts: host_state.pre_flushing_run_state = host_state.run_state host_state.run_state = IteratingStates.HANDLERS msg = "triggered running handlers for %s" % target_host.name else: skipped = True skip_reason += ', not running handlers for %s' % target_host.name elif meta_action == 'refresh_inventory': self._inventory.refresh_inventory() self._set_hosts_cache(iterator._play) msg = "inventory successfully refreshed" elif meta_action == 'clear_facts': if _evaluate_conditional(target_host): for host in self._inventory.get_hosts(iterator._play.hosts): hostname = host.get_name() self._variable_manager.clear_facts(hostname) msg = "facts cleared" else: skipped = True skip_reason += ', not clearing facts and fact cache for %s' % target_host.name elif meta_action == 'clear_host_errors': if _evaluate_conditional(target_host): for host in self._inventory.get_hosts(iterator._play.hosts): self._tqm._failed_hosts.pop(host.name, False) self._tqm._unreachable_hosts.pop(host.name, False) iterator.clear_host_errors(host) msg = "cleared host errors" else: skipped = True skip_reason += ', not clearing host error state for %s' % target_host.name elif meta_action == 'end_batch': if _evaluate_conditional(target_host): for host in self._inventory.get_hosts(iterator._play.hosts): if host.name not in self._tqm._unreachable_hosts: iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) msg = "ending batch" else: skipped = True skip_reason += ', continuing current batch' elif meta_action == 'end_play': if _evaluate_conditional(target_host): for host in self._inventory.get_hosts(iterator._play.hosts): if host.name not in self._tqm._unreachable_hosts: iterator.set_run_state_for_host(host.name, IteratingStates.COMPLETE) # end_play is used in PlaybookExecutor/TQM to indicate that # the whole play is supposed to be ended as opposed to just a batch iterator.end_play = True msg = "ending play" else: skipped = True skip_reason += ', continuing play' elif meta_action == 'end_host': if _evaluate_conditional(target_host): iterator.set_run_state_for_host(target_host.name, IteratingStates.COMPLETE) iterator._play._removed_hosts.append(target_host.name) msg = "ending play for %s" % target_host.name else: skipped = True skip_reason += ", continuing execution for %s" % target_host.name # TODO: Nix msg here? Left for historical reasons, but skip_reason exists now. msg = "end_host conditional evaluated to false, continuing execution for %s" % target_host.name elif meta_action == 'role_complete': # Allow users to use this in a play as reported in https://github.com/ansible/ansible/issues/22286? # How would this work with allow_duplicates?? if task.implicit: role_obj = self._get_cached_role(task, iterator._play) role_obj._completed[target_host.name] = True msg = 'role_complete for %s' % target_host.name elif meta_action == 'reset_connection': all_vars = self._variable_manager.get_vars(play=iterator._play, host=target_host, task=task, _hosts=self._hosts_cache, _hosts_all=self._hosts_cache_all) templar = Templar(loader=self._loader, variables=all_vars) # apply the given task's information to the connection info, # which may override some fields already set by the play or # the options specified on the command line play_context = play_context.set_task_and_variable_override(task=task, variables=all_vars, templar=templar) # fields set from the play/task may be based on variables, so we have to # do the same kind of post validation step on it here before we use it. play_context.post_validate(templar=templar) # now that the play context is finalized, if the remote_addr is not set # default to using the host's address field as the remote address if not play_context.remote_addr: play_context.remote_addr = target_host.address # We also add "magic" variables back into the variables dict to make sure # a certain subset of variables exist. This 'mostly' works here cause meta # disregards the loop, but should not really use play_context at all play_context.update_vars(all_vars) if target_host in self._active_connections: connection = Connection(self._active_connections[target_host]) del self._active_connections[target_host] else: connection = plugin_loader.connection_loader.get(play_context.connection, play_context, os.devnull) connection.set_options(task_keys=task.dump_attrs(), var_options=all_vars) play_context.set_attributes_from_plugin(connection) if connection: try: connection.reset() msg = 'reset connection' except ConnectionError as e: # most likely socket is already closed display.debug("got an error while closing persistent connection: %s" % e) else: msg = 'no connection, nothing to reset' else: raise AnsibleError("invalid meta action requested: %s" % meta_action, obj=task._ds) result = {'msg': msg} if skipped: result['skipped'] = True result['skip_reason'] = skip_reason else: result['changed'] = False if not task.implicit: header = skip_reason if skipped else msg display.vv(f"META: {header}") if isinstance(task, Handler): task.remove_host(target_host) res = TaskResult(target_host, task, result) if skipped: self._tqm.send_callback('v2_runner_on_skipped', res) return [res] def _get_cached_role(self, task, play): role_path = task._role.get_role_path() role_cache = play.role_cache[role_path] try: idx = role_cache.index(task._role) return role_cache[idx] except ValueError: raise AnsibleError(f'Cannot locate {task._role.get_name()} in role cache') def get_hosts_left(self, iterator): ''' returns list of available hosts for this iterator by filtering out unreachables ''' hosts_left = [] for host in self._hosts_cache: if host not in self._tqm._unreachable_hosts: try: hosts_left.append(self._inventory.hosts[host]) except KeyError: hosts_left.append(self._inventory.get_host(host)) return hosts_left def update_active_connections(self, results): ''' updates the current active persistent connections ''' for r in results: if 'args' in r._task_fields: socket_path = r._task_fields['args'].get('_ansible_socket') if socket_path: if r._host not in self._active_connections: self._active_connections[r._host] = socket_path class NextAction(object): """ The next action after an interpreter's exit. """ REDO = 1 CONTINUE = 2 EXIT = 3 def __init__(self, result=EXIT): self.result = result class Debugger(cmd.Cmd): prompt_continuous = '> ' # multiple lines def __init__(self, task, host, task_vars, play_context, result, next_action): # cmd.Cmd is old-style class cmd.Cmd.__init__(self) self.prompt = '[%s] %s (debug)> ' % (host, task) self.intro = None self.scope = {} self.scope['task'] = task self.scope['task_vars'] = task_vars self.scope['host'] = host self.scope['play_context'] = play_context self.scope['result'] = result self.next_action = next_action def cmdloop(self): try: cmd.Cmd.cmdloop(self) except KeyboardInterrupt: pass do_h = cmd.Cmd.do_help def do_EOF(self, args): """Quit""" return self.do_quit(args) def do_quit(self, args): """Quit""" display.display('User interrupted execution') self.next_action.result = NextAction.EXIT return True do_q = do_quit def do_continue(self, args): """Continue to next result""" self.next_action.result = NextAction.CONTINUE return True do_c = do_continue def do_redo(self, args): """Schedule task for re-execution. The re-execution may not be the next result""" self.next_action.result = NextAction.REDO return True do_r = do_redo def do_update_task(self, args): """Recreate the task from ``task._ds``, and template with updated ``task_vars``""" templar = Templar(None, variables=self.scope['task_vars']) task = self.scope['task'] task = task.load_data(task._ds) task.post_validate(templar) self.scope['task'] = task do_u = do_update_task def evaluate(self, args): try: return eval(args, globals(), self.scope) except Exception: t, v = sys.exc_info()[:2] if isinstance(t, str): exc_type_name = t else: exc_type_name = t.__name__ display.display('***%s:%s' % (exc_type_name, repr(v))) raise def do_pprint(self, args): """Pretty Print""" try: result = self.evaluate(args) display.display(pprint.pformat(result)) except Exception: pass do_p = do_pprint def execute(self, args): try: code = compile(args + '\n', '', 'single') exec(code, globals(), self.scope) except Exception: t, v = sys.exc_info()[:2] if isinstance(t, str): exc_type_name = t else: exc_type_name = t.__name__ display.display('***%s:%s' % (exc_type_name, repr(v))) raise def default(self, line): try: self.execute(line) except Exception: pass