diff options
-rw-r--r-- | lib/ansible/modules/utilities/helper/_accelerate.py | 77 | ||||
-rw-r--r-- | lib/ansible/modules/utilities/logic/async_status.py | 8 | ||||
-rw-r--r-- | lib/ansible/modules/utilities/logic/async_wrapper.py | 52 | ||||
-rw-r--r-- | test/sanity/pep8/legacy-files.txt | 3 |
4 files changed, 75 insertions, 65 deletions
diff --git a/lib/ansible/modules/utilities/helper/_accelerate.py b/lib/ansible/modules/utilities/helper/_accelerate.py index 4df745dd48..c6bf2604e4 100644 --- a/lib/ansible/modules/utilities/helper/_accelerate.py +++ b/lib/ansible/modules/utilities/helper/_accelerate.py @@ -105,25 +105,31 @@ from ansible.module_utils.six.moves import socketserver # leaving room for base64 (+33%) encoding and header (100 bytes) # 4 * (975/3) + 100 = 1400 # which leaves room for the TCP/IP header -CHUNK_SIZE=10240 +CHUNK_SIZE = 10240 # FIXME: this all should be moved to module_common, as it's # pretty much a copy from the callbacks/util code -DEBUG_LEVEL=0 +DEBUG_LEVEL = 0 + + def log(msg, cap=0): global DEBUG_LEVEL if DEBUG_LEVEL >= cap: - syslog.syslog(syslog.LOG_NOTICE|syslog.LOG_DAEMON, msg) + syslog.syslog(syslog.LOG_NOTICE | syslog.LOG_DAEMON, msg) + def v(msg): log(msg, cap=1) + def vv(msg): log(msg, cap=2) + def vvv(msg): log(msg, cap=3) + def vvvv(msg): log(msg, cap=4) @@ -137,6 +143,7 @@ except ImportError: SOCKET_FILE = os.path.join(get_module_path(), '.ansible-accelerate', ".local.socket") + def get_pid_location(module): """ Try to find a pid directory in the common locations, falling @@ -144,7 +151,7 @@ def get_pid_location(module): """ for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]: try: - if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK): + if os.path.isdir(dir) and os.access(dir, os.R_OK | os.W_OK): return os.path.join(dir, '.accelerate.pid') except: pass @@ -185,12 +192,13 @@ def daemonize_self(module, password, port, minutes, pid_file): log('fork #2 failed: %d (%s)' % (e.errno, e.strerror)) sys.exit(1) - dev_null = open('/dev/null','rw') + dev_null = open('/dev/null', 'rw') os.dup2(dev_null.fileno(), sys.stdin.fileno()) os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno()) log("daemonizing successful") + class LocalSocketThread(Thread): server = None terminated = False @@ -271,6 +279,7 @@ class LocalSocketThread(Thread): self.s.shutdown(socket.SHUT_RDWR) self.s.close() + class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) @@ -281,14 +290,16 @@ class ThreadWithReturnValue(Thread): self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) - def join(self,timeout=None): + def join(self, timeout=None): Thread.join(self, timeout=timeout) return self._return + class ThreadedTCPServer(socketserver.ThreadingTCPServer): key_list = [] last_event = datetime.datetime.now() last_event_lock = Lock() + def __init__(self, server_address, RequestHandlerClass, module, password, timeout, use_ipv6=False): self.module = module self.key_list.append(AesKey.Read(password)) @@ -309,6 +320,7 @@ class ThreadedTCPServer(socketserver.ThreadingTCPServer): self.running = False socketserver.ThreadingTCPServer.shutdown(self) + class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): # the key to use for this connection active_key = None @@ -324,7 +336,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): return self.request.sendall(packed_len + data) def recv_data(self): - header_len = 8 # size of a packed unsigned long long + header_len = 8 # size of a packed unsigned long long data = "" vvvv("in recv_data(), waiting for the header") while len(data) < header_len: @@ -339,9 +351,9 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): vvvv("exception received while waiting for recv(), returning None") return None vvvv("in recv_data(), got the header, unpacking") - data_len = struct.unpack('!Q',data[:header_len])[0] + data_len = struct.unpack('!Q', data[:header_len])[0] data = data[header_len:] - vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) + vvvv("data received so far (expecting %d): %d" % (data_len, len(data))) while len(data) < data_len: try: d = self.request.recv(data_len - len(data)) @@ -349,7 +361,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): vvv("received nothing, bailing out") return None data += d - vvvv("data received so far (expecting %d): %d" % (data_len,len(data))) + vvvv("data received so far (expecting %d): %d" % (data_len, len(data))) except: # probably got a connection reset vvvv("exception received while waiting for recv(), returning None") @@ -511,7 +523,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): response = self.active_key.Decrypt(response) response = json.loads(response) - if response.get('failed',False): + if response.get('failed', False): log("got a failed response from the master") return dict(failed=True, stderr="Master reported failure, aborting transfer") except Exception as e: @@ -538,7 +550,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): os.makedirs(tmp_path, int('O700', 8)) except: return dict(failed=True, msg='could not create a temporary directory at %s' % tmp_path) - (fd,out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path) + (fd, out_path) = tempfile.mkstemp(prefix='ansible.', dir=tmp_path) out_fd = os.fdopen(fd, 'w', 0) final_path = data['out_path'] else: @@ -546,7 +558,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): out_fd = open(out_path, 'w') try: - bytes=0 + bytes = 0 while True: out = base64.b64decode(data['data']) bytes += len(out) @@ -575,6 +587,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): self.server.module.atomic_move(out_path, final_path) return dict() + def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file): try: daemonize_self(module, password, port, minutes, pid_file) @@ -613,7 +626,7 @@ def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file): server.allow_reuse_address = True break except Exception as e: - vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries,e)) + vv("Failed to create the TCP server (tries left = %d) (error: %s) " % (tries, e)) tries -= 1 time.sleep(0.2) @@ -640,35 +653,36 @@ def daemonize(module, password, port, timeout, minutes, use_ipv6, pid_file): log("exception caught, exiting accelerated mode: %s\n%s" % (e, tb)) sys.exit(0) + def main(): global DEBUG_LEVEL module = AnsibleModule( - argument_spec = dict( - port=dict(required=False, default=5099), - ipv6=dict(required=False, default=False, type='bool'), - multi_key=dict(required=False, default=False, type='bool'), - timeout=dict(required=False, default=300), - password=dict(required=True, no_log=True), - minutes=dict(required=False, default=30), - debug=dict(required=False, default=0, type='int') + argument_spec=dict( + port=dict(type='int', default=5099), + ipv6=dict(type='bool', default=False), + multi_key=dict(type='bool', default=False), + timeout=dict(type='int', default=300), + password=dict(type='str', required=True, no_log=True), + minutes=dict(type='int', default=30), + debug=dict(type='int', default=0) ), supports_check_mode=True ) syslog.openlog('ansible-%s' % module._name) - password = base64.b64decode(module.params['password']) - port = int(module.params['port']) - timeout = int(module.params['timeout']) - minutes = int(module.params['minutes']) - debug = int(module.params['debug']) - ipv6 = module.params['ipv6'] + password = base64.b64decode(module.params['password']) + port = int(module.params['port']) + timeout = int(module.params['timeout']) + minutes = int(module.params['minutes']) + debug = int(module.params['debug']) + ipv6 = module.params['ipv6'] multi_key = module.params['multi_key'] if not HAS_KEYCZAR: module.fail_json(msg="keyczar is not installed (on the remote side)") - DEBUG_LEVEL=debug + DEBUG_LEVEL = debug pid_file = get_pid_location(module) daemon_pid = None @@ -682,9 +696,7 @@ def main(): # whether other signals can be sent os.kill(daemon_pid, 0) except OSError as e: - message = 'the accelerate daemon appears to be running' - message += 'as a different user that this user cannot access' - message += 'pid=%s' % daemon_pid + message = 'the accelerate daemon appears to be running as a different user that this user cannot access pid=%s' % daemon_pid if e.errno == errno.EPERM: # no permissions means the pid is probably @@ -726,5 +738,6 @@ def main(): # try to start up the daemon daemonize(module, password, port, timeout, minutes, ipv6, pid_file) + if __name__ == '__main__': main() diff --git a/lib/ansible/modules/utilities/logic/async_status.py b/lib/ansible/modules/utilities/logic/async_status.py index 6c7c248a29..f2a73dd979 100644 --- a/lib/ansible/modules/utilities/logic/async_status.py +++ b/lib/ansible/modules/utilities/logic/async_status.py @@ -51,11 +51,11 @@ def main(): module = AnsibleModule(argument_spec=dict( jid=dict(required=True), - mode=dict(default='status', choices=['status','cleanup']), + mode=dict(default='status', choices=['status', 'cleanup']), )) mode = module.params['mode'] - jid = module.params['jid'] + jid = module.params['jid'] # setup logging directory logdir = os.path.expanduser("~/.ansible_async") @@ -82,9 +82,9 @@ def main(): module.exit_json(results_file=log_path, ansible_job_id=jid, started=1, finished=0) else: module.fail_json(ansible_job_id=jid, results_file=log_path, - msg="Could not parse job output: %s" % data, started=1, finished=1) + msg="Could not parse job output: %s" % data, started=1, finished=1) - if not 'started' in data: + if 'started' not in data: data['finished'] = 1 data['ansible_job_id'] = jid elif 'finished' not in data: diff --git a/lib/ansible/modules/utilities/logic/async_wrapper.py b/lib/ansible/modules/utilities/logic/async_wrapper.py index db18c944eb..fb3a7aca8c 100644 --- a/lib/ansible/modules/utilities/logic/async_wrapper.py +++ b/lib/ansible/modules/utilities/logic/async_wrapper.py @@ -27,9 +27,11 @@ PY3 = sys.version_info[0] == 3 syslog.openlog('ansible-%s' % os.path.basename(__file__)) syslog.syslog(syslog.LOG_NOTICE, 'Invoked with %s' % " ".join(sys.argv[1:])) + def notice(msg): syslog.syslog(syslog.LOG_NOTICE, msg) + def daemonize_self(): # daemonizing code: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012 try: @@ -60,6 +62,7 @@ def daemonize_self(): os.dup2(dev_null.fileno(), sys.stdout.fileno()) os.dup2(dev_null.fileno(), sys.stderr.fileno()) + # NB: this function copied from module_utils/json_utils.py. Ensure any changes are propagated there. # FUTURE: AnsibleModule-ify this module so it's Ansiballz-compatible and can use the module_utils copy of this function. def _filter_non_json_lines(data): @@ -121,7 +124,7 @@ def _run_module(wrapped_cmd, jid, job_path): tmp_job_path = job_path + ".tmp" jobfile = open(tmp_job_path, "w") - jobfile.write(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid })) + jobfile.write(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid})) jobfile.close() os.rename(tmp_job_path, job_path) jobfile = open(tmp_job_path, "w") @@ -163,9 +166,9 @@ def _run_module(wrapped_cmd, jid, job_path): e = sys.exc_info()[1] result = { "failed": 1, - "cmd" : wrapped_cmd, + "cmd": wrapped_cmd, "msg": str(e), - "outdata": outdata, # temporary notice only + "outdata": outdata, # temporary notice only "stderr": stderr } result['ansible_job_id'] = jid @@ -173,11 +176,11 @@ def _run_module(wrapped_cmd, jid, job_path): except (ValueError, Exception): result = { - "failed" : 1, - "cmd" : wrapped_cmd, - "data" : outdata, # temporary notice only + "failed": 1, + "cmd": wrapped_cmd, + "data": outdata, # temporary notice only "stderr": stderr, - "msg" : traceback.format_exc() + "msg": traceback.format_exc() } result['ansible_job_id'] = jid jobfile.write(json.dumps(result)) @@ -186,16 +189,13 @@ def _run_module(wrapped_cmd, jid, job_path): os.rename(tmp_job_path, job_path) -#################### -## main ## -#################### if __name__ == '__main__': if len(sys.argv) < 5: print(json.dumps({ - "failed" : True, - "msg" : "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile> [-preserve_tmp] " - "Humans, do not call directly!" + "failed": True, + "msg": "usage: async_wrapper <jid> <time_limit> <modulescript> <argsfile> [-preserve_tmp] " + "Humans, do not call directly!" })) sys.exit(1) @@ -225,8 +225,8 @@ if __name__ == '__main__': os.makedirs(jobdir) except: print(json.dumps({ - "failed" : 1, - "msg" : "could not create: %s" % jobdir + "failed": 1, + "msg": "could not create: %s" % jobdir })) # immediately exit this process, leaving an orphaned process # running which immediately forks a supervisory timing process @@ -241,8 +241,8 @@ if __name__ == '__main__': # this probably could be done with some IPC later. Modules should always read # the argsfile at the very first start of their execution anyway notice("Return async_wrapper task started.") - print(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid, "results_file" : job_path, - "_ansible_suppress_tmpdir_delete": not preserve_tmp})) + print(json.dumps({"started": 1, "finished": 0, "ansible_job_id": jid, "results_file": job_path, + "_ansible_suppress_tmpdir_delete": not preserve_tmp})) sys.stdout.flush() time.sleep(1) sys.exit(0) @@ -263,16 +263,16 @@ if __name__ == '__main__': # set the child process group id to kill all children os.setpgid(sub_pid, sub_pid) - notice("Start watching %s (%s)"%(sub_pid, remaining)) + notice("Start watching %s (%s)" % (sub_pid, remaining)) time.sleep(step) while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): - notice("%s still running (%s)"%(sub_pid, remaining)) + notice("%s still running (%s)" % (sub_pid, remaining)) time.sleep(step) remaining = remaining - step if remaining <= 0: - notice("Now killing %s"%(sub_pid)) + notice("Now killing %s" % (sub_pid)) os.killpg(sub_pid, signal.SIGKILL) - notice("Sent kill to group %s"%sub_pid) + notice("Sent kill to group %s " % sub_pid) time.sleep(1) if not preserve_tmp: shutil.rmtree(os.path.dirname(wrapped_module), True) @@ -283,9 +283,9 @@ if __name__ == '__main__': sys.exit(0) else: # the child process runs the actual module - notice("Start module (%s)"%os.getpid()) + notice("Start module (%s)" % os.getpid()) _run_module(cmd, jid, job_path) - notice("Module complete (%s)"%os.getpid()) + notice("Module complete (%s)" % os.getpid()) sys.exit(0) except SystemExit: @@ -295,9 +295,9 @@ if __name__ == '__main__': except Exception: e = sys.exc_info()[1] - notice("error: %s"%(e)) + notice("error: %s" % e) print(json.dumps({ - "failed" : True, - "msg" : "FATAL ERROR: %s" % str(e) + "failed": True, + "msg": "FATAL ERROR: %s" % e })) sys.exit(1) diff --git a/test/sanity/pep8/legacy-files.txt b/test/sanity/pep8/legacy-files.txt index bacff54616..bb727ea5f7 100644 --- a/test/sanity/pep8/legacy-files.txt +++ b/test/sanity/pep8/legacy-files.txt @@ -483,7 +483,4 @@ lib/ansible/modules/system/systemd.py lib/ansible/modules/system/timezone.py lib/ansible/modules/system/ufw.py lib/ansible/modules/system/user.py -lib/ansible/modules/utilities/helper/_accelerate.py -lib/ansible/modules/utilities/logic/async_status.py -lib/ansible/modules/utilities/logic/async_wrapper.py lib/ansible/playbook/base.py |