From 9c0275a879d96eb9aee5e66987b1f07a013f23db Mon Sep 17 00:00:00 2001 From: Nathaniel Case Date: Thu, 9 Nov 2017 15:04:40 -0500 Subject: Connection plugins network_cli and netconf (#32521) * implements jsonrpc message passing for ansible-connection * implements more generic mechanism for persistent connections * starts persistent connection in task_executor if enabled and supported * supports using network_cli as top level connection plugin * enhances logging for persistent connection to stdout * Update action plugins * Fix Python3 RPC * Fix Junos bytes<-->str issues * supports using netconf as top level connection plugin * Error message when running netconf on an unsupported platform * Update tests * Fix `authorize: yes` for `connection: local` * Handle potentially JSON data in terminal * Add clarifying detail if possible on ConnectionError --- bin/ansible-connection | 351 +++++++++++++++++++------------------------------ 1 file changed, 138 insertions(+), 213 deletions(-) (limited to 'bin') diff --git a/bin/ansible-connection b/bin/ansible-connection index dd727743da..7fbceedbd9 100755 --- a/bin/ansible-connection +++ b/bin/ansible-connection @@ -1,23 +1,6 @@ #!/usr/bin/env python - -# (c) 2017, Ansible, Inc. -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - -######################################################## +# Copyright: (c) 2017, Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type @@ -36,91 +19,68 @@ import socket import sys import time import traceback -import datetime import errno +import json from ansible import constants as C from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.module_utils.six import PY3 from ansible.module_utils.six.moves import cPickle from ansible.module_utils.connection import send_data, recv_data +from ansible.module_utils.service import fork_process from ansible.playbook.play_context import PlayContext from ansible.plugins.loader import connection_loader from ansible.utils.path import unfrackpath, makedirs_safe -from ansible.errors import AnsibleConnectionFailure +from ansible.errors import AnsibleError from ansible.utils.display import Display +from ansible.utils.jsonrpc import JsonRpcServer -def do_fork(): +class ConnectionProcess(object): ''' - Does the required double fork for a daemon process. Based on - http://code.activestate.com/recipes/66012-fork-a-daemon-process-on-unix/ + The connection process wraps around a Connection object that manages + the connection to a remote device that persists over the playbook ''' - try: - pid = os.fork() - if pid > 0: - return pid - # This is done as a 'good practice' for daemons, but we need to keep the cwd - # leaving it here as a note that we KNOW its good practice but are not doing it on purpose. - # os.chdir("/") - os.setsid() - os.umask(0) - - try: - pid = os.fork() - if pid > 0: - sys.exit(0) - - if C.DEFAULT_LOG_PATH != '': - out_file = open(C.DEFAULT_LOG_PATH, 'ab+') - err_file = open(C.DEFAULT_LOG_PATH, 'ab+', 0) - else: - out_file = open('/dev/null', 'ab+') - err_file = open('/dev/null', 'ab+', 0) - - os.dup2(out_file.fileno(), sys.stdout.fileno()) - os.dup2(err_file.fileno(), sys.stderr.fileno()) - os.close(sys.stdin.fileno()) - - return pid - except OSError as e: - sys.exit(1) - except OSError as e: - sys.exit(1) - - -class Server(): - - def __init__(self, socket_path, play_context): - self.socket_path = socket_path + def __init__(self, fd, play_context, socket_path, original_path): self.play_context = play_context + self.socket_path = socket_path + self.original_path = original_path - display.display( - 'creating new control socket for host %s:%s as user %s' % - (play_context.remote_addr, play_context.port, play_context.remote_user), - log_only=True - ) - - display.display('control socket path is %s' % socket_path, log_only=True) - display.display('current working directory is %s' % os.getcwd(), log_only=True) - - self._start_time = datetime.datetime.now() - - display.display("using connection plugin %s" % self.play_context.connection, log_only=True) - - self.connection = connection_loader.get(play_context.connection, play_context, sys.stdin) - self.connection._connect() - - if not self.connection.connected: - raise AnsibleConnectionFailure('unable to connect to remote host %s' % self._play_context.remote_addr) + self.fd = fd + self.exception = None - connection_time = datetime.datetime.now() - self._start_time - display.display('connection established to %s in %s' % (play_context.remote_addr, connection_time), log_only=True) + self.srv = JsonRpcServer() + self.sock = None - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.bind(self.socket_path) - self.socket.listen(1) - display.display('local socket is set to listening', log_only=True) + def start(self): + try: + messages = list() + result = {} + + messages.append('control socket path is %s' % self.socket_path) + + # If this is a relative path (~ gets expanded later) then plug the + # key's path on to the directory we originally came from, so we can + # find it now that our cwd is / + if self.play_context.private_key_file and self.play_context.private_key_file[0] not in '~/': + self.play_context.private_key_file = os.path.join(self.original_path, self.play_context.private_key_file) + + self.connection = connection_loader.get(self.play_context.connection, self.play_context, '/dev/null') + self.connection._connect() + self.srv.register(self.connection) + messages.append('connection to remote device started successfully') + + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.bind(self.socket_path) + self.sock.listen(1) + messages.append('local domain socket listeners started successfully') + except Exception as exc: + result['error'] = to_text(exc) + result['exception'] = traceback.format_exc() + finally: + result['messages'] = messages + self.fd.write(json.dumps(result)) + self.fd.close() def run(self): try: @@ -129,53 +89,36 @@ class Server(): signal.signal(signal.SIGTERM, self.handler) signal.alarm(C.PERSISTENT_CONNECT_TIMEOUT) - (s, addr) = self.socket.accept() - display.display('incoming request accepted on persistent socket', log_only=True) + self.exception = None + (s, addr) = self.sock.accept() signal.alarm(0) + signal.signal(signal.SIGALRM, self.command_timeout) while True: data = recv_data(s) if not data: break - signal.signal(signal.SIGALRM, self.command_timeout) - signal.alarm(self.play_context.timeout) - - op = to_text(data.split(b':')[0]) - display.display('socket operation is %s' % op, log_only=True) - - method = getattr(self, 'do_%s' % op, None) - - rc = 255 - stdout = stderr = '' - - if not method: - stderr = 'Invalid action specified' - else: - rc, stdout, stderr = method(data) - + signal.alarm(self.connection._play_context.timeout) + resp = self.srv.handle_request(data) signal.alarm(0) - display.display('socket operation completed with rc %s' % rc, log_only=True) - - send_data(s, to_bytes(rc)) - send_data(s, to_bytes(stdout)) - send_data(s, to_bytes(stderr)) + send_data(s, to_bytes(resp)) s.close() except Exception as e: # socket.accept() will raise EINTR if the socket.close() is called - if e.errno != errno.EINTR: - display.display(traceback.format_exc(), log_only=True) + if hasattr(e, 'errno'): + if e.errno != errno.EINTR: + self.exception = traceback.format_exc() + else: + self.exception = traceback.format_exc() finally: # when done, close the connection properly and cleanup # the socket file so it can be recreated self.shutdown() - end_time = datetime.datetime.now() - delta = end_time - self._start_time - display.display('shutdown local socket, connection was active for %s secs' % delta, log_only=True) def connect_timeout(self, signum, frame): display.display('persistent connection idle timeout triggered, timeout value is %s secs' % C.PERSISTENT_CONNECT_TIMEOUT, log_only=True) @@ -190,25 +133,25 @@ class Server(): self.shutdown() def shutdown(self): - display.display('shutdown persistent connection requested', log_only=True) - + """ Shuts down the local domain socket + """ if not os.path.exists(self.socket_path): - display.display('persistent connection is not active', log_only=True) return try: - if self.socket: - display.display('closing local listener', log_only=True) - self.socket.close() + if self.sock: + self.sock.close() if self.connection: - display.display('closing the connection', log_only=True) self.connection.close() + except: pass + finally: if os.path.exists(self.socket_path): - display.display('removing the local control socket', log_only=True) os.remove(self.socket_path) + setattr(self.connection, '_socket_path', None) + setattr(self.connection, '_connected', False) display.display('shutdown complete', log_only=True) @@ -262,6 +205,13 @@ def communicate(sock, data): def main(): + """ Called to initiate the connect to the remote device + """ + rc = 0 + result = {} + messages = list() + socket_path = None + # Need stdin as a byte stream if PY3: stdin = sys.stdin.buffer @@ -270,116 +220,91 @@ def main(): try: # read the play context data via stdin, which means depickling it - # FIXME: as noted above, we will probably need to deserialize the - # connection loader here as well at some point, otherwise this - # won't find role- or playbook-based connection plugins cur_line = stdin.readline() init_data = b'' + while cur_line.strip() != b'#END_INIT#': if cur_line == b'': raise Exception("EOF found before init data was complete") init_data += cur_line cur_line = stdin.readline() + if PY3: pc_data = cPickle.loads(init_data, encoding='bytes') else: pc_data = cPickle.loads(init_data) - pc = PlayContext() - pc.deserialize(pc_data) + play_context = PlayContext() + play_context.deserialize(pc_data) except Exception as e: - # FIXME: better error message/handling/logging - sys.stderr.write(traceback.format_exc()) - sys.exit("FAIL: %s" % e) - - ssh = connection_loader.get('ssh', class_only=True) - cp = ssh._create_control_path(pc.remote_addr, pc.port, pc.remote_user, pc.connection) - - # create the persistent connection dir if need be and create the paths - # which we will be using later - tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR) - makedirs_safe(tmp_path) - lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path) - socket_path = unfrackpath(cp % dict(directory=tmp_path)) - - # if the socket file doesn't exist, spin up the daemon process - lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600) - fcntl.lockf(lock_fd, fcntl.LOCK_EX) - - if not os.path.exists(socket_path): - pid = do_fork() - if pid == 0: - rc = 0 - try: - server = Server(socket_path, pc) - except AnsibleConnectionFailure as exc: - display.display('connecting to host %s returned an error' % pc.remote_addr, log_only=True) - display.display(str(exc), log_only=True) - rc = 1 - except Exception as exc: - display.display('failed to create control socket for host %s' % pc.remote_addr, log_only=True) - display.display(traceback.format_exc(), log_only=True) - rc = 1 - fcntl.lockf(lock_fd, fcntl.LOCK_UN) - os.close(lock_fd) - if rc == 0: - server.run() - sys.exit(rc) - else: - display.display('re-using existing socket for %s@%s:%s' % (pc.remote_user, pc.remote_addr, pc.port), log_only=True) - - fcntl.lockf(lock_fd, fcntl.LOCK_UN) - os.close(lock_fd) - - timeout = pc.timeout - while bool(timeout): - if os.path.exists(socket_path): - display.vvvv('connected to local socket in %s' % (pc.timeout - timeout), pc.remote_addr) - break - time.sleep(1) - timeout -= 1 - else: - raise AnsibleConnectionFailure('timeout waiting for local socket', pc.remote_addr) - - # now connect to the daemon process - # FIXME: if the socket file existed but the daemonized process was killed, - # the connection will timeout here. Need to make this more resilient. - while True: - data = stdin.readline() - if data == b'': - break - if data.strip() == b'': - continue - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - - connect_retry_timeout = C.PERSISTENT_CONNECT_RETRY_TIMEOUT - while bool(connect_retry_timeout): - try: - sock.connect(socket_path) - break - except socket.error: - time.sleep(1) - connect_retry_timeout -= 1 - else: - display.display('connect retry timeout expired, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True) - display.display('persistent_connect_retry_timeout is %s secs' % (C.PERSISTENT_CONNECT_RETRY_TIMEOUT), pc.remote_addr, pc.remote_user, log_only=True) - sys.stderr.write('failed to connect to control socket') - sys.exit(255) + rc = 1 + result.update({ + 'error': to_text(e), + 'exception': traceback.format_exc() + }) + + if rc == 0: + ssh = connection_loader.get('ssh', class_only=True) + cp = ssh._create_control_path(play_context.remote_addr, play_context.port, play_context.remote_user, play_context.connection) + + # create the persistent connection dir if need be and create the paths + # which we will be using later + tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR) + makedirs_safe(tmp_path) + + lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path) + socket_path = unfrackpath(cp % dict(directory=tmp_path)) + + # if the socket file doesn't exist, spin up the daemon process + lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT, 0o600) + fcntl.lockf(lock_fd, fcntl.LOCK_EX) + + if not os.path.exists(socket_path): + messages.append('local domain socket does not exist, starting it') + original_path = os.getcwd() + r, w = os.pipe() + pid = fork_process() + + if pid == 0: + try: + os.close(r) + wfd = os.fdopen(w, 'w') + process = ConnectionProcess(wfd, play_context, socket_path, original_path) + process.start() + except Exception as exc: + messages.append(traceback.format_exc()) + rc = 1 + + fcntl.lockf(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + + if rc == 0: + process.run() + + sys.exit(rc) - # send the play_context back into the connection so the connection - # can handle any privilege escalation activities - pc_data = b'CONTEXT: %s' % init_data - communicate(sock, pc_data) + else: + os.close(w) + rfd = os.fdopen(r, 'r') + data = json.loads(rfd.read()) + messages.extend(data.pop('messages')) + result.update(data) - rc, stdout, stderr = communicate(sock, data.strip()) + else: + messages.append('found existing local domain socket, using it!') - sys.stdout.write(to_native(stdout)) - sys.stderr.write(to_native(stderr)) + result.update({ + 'messages': messages, + 'socket_path': socket_path + }) - sock.close() - break + if 'exception' in result: + rc = 1 + sys.stderr.write(json.dumps(result)) + else: + rc = 0 + sys.stdout.write(json.dumps(result)) sys.exit(rc) -- cgit v1.2.1