diff options
author | Richard Maw <richard.maw@codethink.co.uk> | 2014-08-29 16:47:54 +0000 |
---|---|---|
committer | Richard Maw <richard.maw@codethink.co.uk> | 2014-08-29 16:52:20 +0000 |
commit | 12b98200a1c3c864a27df644bf0b2e0f468a7fc3 (patch) | |
tree | 83c7364d41c6fcd97b94653fdaec10b27e413f9d | |
parent | 9846f276e00e5f8f3e520e940ea8b78a44fec150 (diff) | |
download | morph-12b98200a1c3c864a27df644bf0b2e0f468a7fc3.tar.gz |
Replace sleeping non-blocking loop with asyncore
This is ~10 lines shorter than the non-blocking loop, though improving
the whitespace formatting would probably cut that down to being about
the same.
However asyncore uses a select/poll internally, so doesn't need to sleep
to avoid busy-waiting since it can block, which makes it more responsive
to output than the read loop.
-rw-r--r-- | morphlib/plugins/deploy_plugin.py | 81 |
1 files changed, 34 insertions, 47 deletions
diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py index 2ac98743..da72f91b 100644 --- a/morphlib/plugins/deploy_plugin.py +++ b/morphlib/plugins/deploy_plugin.py @@ -14,7 +14,8 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import errno +import asyncore +import asynchat import fcntl import json import logging @@ -24,7 +25,6 @@ import subprocess import sys import tarfile import tempfile -import time import uuid import cliapp @@ -545,11 +545,6 @@ class DeployPlugin(cliapp.Plugin): self.app.status(msg='Cleaning up') shutil.rmtree(deploy_private_tempdir) - def _set_nonblocking(self, fd): - flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) - flags = flags | os.O_NONBLOCK - fcntl.fcntl(fd, fcntl.F_SETFL, flags) - def _run_extension(self, gd, name, kind, args, env): '''Run an extension. @@ -579,60 +574,52 @@ class DeployPlugin(cliapp.Plugin): [ext_filename] + args, cwd=gd.dirname, env=new_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - for fd in [log_read_fd, log_write_fd, p.stderr.fileno(), - p.stdout.fileno()]: - self._set_nonblocking(fd) - self._watch_extension_subprocess(name, kind, p, log_read_fd) finally: os.close(log_read_fd) os.close(log_write_fd) - def _read_lines_nonblocking(self, fd): - # We must use os.read() directly here: see - # <http://bugs.python.org/issue1175#msg56041> - reads = [] - while True: - try: - data = os.read(fd, 1024) - if len(data) == 0: - break - reads.append(data) - except OSError as e: - if e.errno == errno.EWOULDBLOCK: - break - raise e - if len(reads) == 0: - return [] - else: - full_text = ''.join(reads) - lines = full_text.rstrip().split('\n') - return lines - def _watch_extension_subprocess(self, name, kind, p, log_read_fd): '''Follow stdout, stderr and log output of an extension subprocess.''' error = [] + class OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher): + def __init__(self, sock, map=None): + asynchat.async_chat.__init__(self, sock=None, map=map) + asyncore.file_dispatcher.__init__(self, fd=sock, map=map) + self.set_terminator('\n') + def collect_incoming_data(self, data): + return self._collect_incoming_data(data) + class StdoutDispatcher(OutputDispatcher): + def __init__(self, status, sock, map=None): + self.status = status + OutputDispatcher.__init__(self, sock=sock, map=map) + @staticmethod + def escape(line): + return line.replace('%', '%%') + def found_terminator(self): + self.status(msg=self.escape(''.join(self.incoming))) + class StderrDispatcher(OutputDispatcher): + def found_terminator(self): + line = ''.join(self.incoming) + error.append(line) + sys.stderr.write(line) + sys.stderr.write('\n') + class LogDispatcher(OutputDispatcher): + def found_terminator(self): + line = ''.join(self.incoming) + logging.debug('%s%s: %s', name, kind, line) try: + socket_map = {} + StdoutDispatcher(status=self.app.status, + sock=p.stdout, map=socket_map) + StderrDispatcher(sock=p.stderr, map=socket_map) + LogDispatcher(sock=log_read_fd, map=socket_map) while True: - def escape(line): - return line.replace('%', '%%') - - for line in self._read_lines_nonblocking(p.stdout.fileno()): - self.app.status(msg=escape(line.rstrip())) - - for line in self._read_lines_nonblocking(p.stderr.fileno()): - error.append(line) - sys.stderr.write(line) - sys.stderr.write('\n') - - for line in self._read_lines_nonblocking(log_read_fd): - logging.debug('%s%s: %s', name, kind, line.rstrip()) + asyncore.loop(use_poll=True, map=socket_map, count=1) if p.poll() is not None: break - - time.sleep(0.2) except BaseException as e: logging.debug('Received exception %r watching extension' % e) p.terminate() |