summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Maw <richard.maw@codethink.co.uk>2014-08-29 16:47:54 +0000
committerRichard Maw <richard.maw@codethink.co.uk>2014-08-29 16:52:20 +0000
commit12b98200a1c3c864a27df644bf0b2e0f468a7fc3 (patch)
tree83c7364d41c6fcd97b94653fdaec10b27e413f9d
parent9846f276e00e5f8f3e520e940ea8b78a44fec150 (diff)
downloadmorph-12b98200a1c3c864a27df644bf0b2e0f468a7fc3.tar.gz
Replace sleeping non-blocking loop with asyncore
This is ~10 lines shorter than the non-blocking loop, though improving the whitespace formatting would probably cut that down to being about the same. However asyncore uses a select/poll internally, so doesn't need to sleep to avoid busy-waiting since it can block, which makes it more responsive to output than the read loop.
-rw-r--r--morphlib/plugins/deploy_plugin.py81
1 files changed, 34 insertions, 47 deletions
diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py
index 2ac98743..da72f91b 100644
--- a/morphlib/plugins/deploy_plugin.py
+++ b/morphlib/plugins/deploy_plugin.py
@@ -14,7 +14,8 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import errno
+import asyncore
+import asynchat
import fcntl
import json
import logging
@@ -24,7 +25,6 @@ import subprocess
import sys
import tarfile
import tempfile
-import time
import uuid
import cliapp
@@ -545,11 +545,6 @@ class DeployPlugin(cliapp.Plugin):
self.app.status(msg='Cleaning up')
shutil.rmtree(deploy_private_tempdir)
- def _set_nonblocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
- flags = flags | os.O_NONBLOCK
- fcntl.fcntl(fd, fcntl.F_SETFL, flags)
-
def _run_extension(self, gd, name, kind, args, env):
'''Run an extension.
@@ -579,60 +574,52 @@ class DeployPlugin(cliapp.Plugin):
[ext_filename] + args, cwd=gd.dirname, env=new_env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- for fd in [log_read_fd, log_write_fd, p.stderr.fileno(),
- p.stdout.fileno()]:
- self._set_nonblocking(fd)
-
self._watch_extension_subprocess(name, kind, p, log_read_fd)
finally:
os.close(log_read_fd)
os.close(log_write_fd)
- def _read_lines_nonblocking(self, fd):
- # We must use os.read() directly here: see
- # <http://bugs.python.org/issue1175#msg56041>
- reads = []
- while True:
- try:
- data = os.read(fd, 1024)
- if len(data) == 0:
- break
- reads.append(data)
- except OSError as e:
- if e.errno == errno.EWOULDBLOCK:
- break
- raise e
- if len(reads) == 0:
- return []
- else:
- full_text = ''.join(reads)
- lines = full_text.rstrip().split('\n')
- return lines
-
def _watch_extension_subprocess(self, name, kind, p, log_read_fd):
'''Follow stdout, stderr and log output of an extension subprocess.'''
error = []
+ class OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher):
+ def __init__(self, sock, map=None):
+ asynchat.async_chat.__init__(self, sock=None, map=map)
+ asyncore.file_dispatcher.__init__(self, fd=sock, map=map)
+ self.set_terminator('\n')
+ def collect_incoming_data(self, data):
+ return self._collect_incoming_data(data)
+ class StdoutDispatcher(OutputDispatcher):
+ def __init__(self, status, sock, map=None):
+ self.status = status
+ OutputDispatcher.__init__(self, sock=sock, map=map)
+ @staticmethod
+ def escape(line):
+ return line.replace('%', '%%')
+ def found_terminator(self):
+ self.status(msg=self.escape(''.join(self.incoming)))
+ class StderrDispatcher(OutputDispatcher):
+ def found_terminator(self):
+ line = ''.join(self.incoming)
+ error.append(line)
+ sys.stderr.write(line)
+ sys.stderr.write('\n')
+ class LogDispatcher(OutputDispatcher):
+ def found_terminator(self):
+ line = ''.join(self.incoming)
+ logging.debug('%s%s: %s', name, kind, line)
try:
+ socket_map = {}
+ StdoutDispatcher(status=self.app.status,
+ sock=p.stdout, map=socket_map)
+ StderrDispatcher(sock=p.stderr, map=socket_map)
+ LogDispatcher(sock=log_read_fd, map=socket_map)
while True:
- def escape(line):
- return line.replace('%', '%%')
-
- for line in self._read_lines_nonblocking(p.stdout.fileno()):
- self.app.status(msg=escape(line.rstrip()))
-
- for line in self._read_lines_nonblocking(p.stderr.fileno()):
- error.append(line)
- sys.stderr.write(line)
- sys.stderr.write('\n')
-
- for line in self._read_lines_nonblocking(log_read_fd):
- logging.debug('%s%s: %s', name, kind, line.rstrip())
+ asyncore.loop(use_poll=True, map=socket_map, count=1)
if p.poll() is not None:
break
-
- time.sleep(0.2)
except BaseException as e:
logging.debug('Received exception %r watching extension' % e)
p.terminate()