summaryrefslogtreecommitdiff
path: root/morphlib/extensions.py
diff options
context:
space:
mode:
Diffstat (limited to 'morphlib/extensions.py')
-rw-r--r--morphlib/extensions.py116
1 files changed, 113 insertions, 3 deletions
diff --git a/morphlib/extensions.py b/morphlib/extensions.py
index 55478418..af6ba279 100644
--- a/morphlib/extensions.py
+++ b/morphlib/extensions.py
@@ -13,14 +13,21 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import cliapp
-import morphlib
+import asyncore
+import asynchat
import glob
+import logging
import os
-import sysbranchdir
import stat
+import subprocess
import tempfile
+import cliapp
+
+import morphlib
+import sysbranchdir
+
+
class ExtensionError(morphlib.Error):
pass
@@ -149,3 +156,106 @@ class get_extension_filename():
def __exit__(self, type, value, trace):
if self.delete:
os.remove(self.ext_filename)
+
+
+class _EOFWrapper(asyncore.file_wrapper):
+ '''File object that reports when it hits EOF
+
+ The async_chat class doesn't notice that its input file has hit EOF,
+ so if we give it one of these instead, it will mark the chatter for
+ closiure and ensure any in-progress buffers are flushed.
+ '''
+ def __init__(self, dispatcher, fd):
+ self._dispatcher = dispatcher
+ asyncore.file_wrapper.__init__(self, fd)
+
+ def recv(self, *args):
+ data = asyncore.file_wrapper.recv(self, *args)
+ if not data:
+ self._dispatcher.close_when_done()
+ # ensure any unterminated data is flushed
+ return self._dispatcher.get_terminator()
+ return data
+
+
+class _OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher):
+ '''asyncore dispatcher that calls line_handler per line.'''
+ def __init__(self, fd, line_handler, map=None):
+ asynchat.async_chat.__init__(self, sock=None, map=map)
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.set_terminator('\n')
+ self._line_handler = line_handler
+ collect_incoming_data = asynchat.async_chat._collect_incoming_data
+ def set_file(self, fd):
+ self.socket = _EOFWrapper(self, fd)
+ self._fileno = self.socket.fileno()
+ self.add_channel()
+ def found_terminator(self):
+ self._line_handler(''.join(self.incoming))
+ self.incoming = []
+
+class ExtensionSubprocess(object):
+
+ def __init__(self, report_stdout, report_stderr, report_logger):
+ self._report_stdout = report_stdout
+ self._report_stderr = report_stderr
+ self._report_logger = report_logger
+
+ def run(self, filename, args, cwd, env):
+ '''Run an extension.
+
+ Anything written by the extension to stdout is passed to status(), thus
+ normally echoed to Morph's stdout. An extra FD is passed in the
+ environment variable MORPH_LOG_FD, and anything written here will be
+ included as debug messages in Morph's log file.
+
+ '''
+
+ log_read_fd, log_write_fd = os.pipe()
+
+ try:
+ new_env = env.copy()
+ new_env['MORPH_LOG_FD'] = str(log_write_fd)
+
+ # Because we don't have python 3.2's pass_fds, we have to
+ # play games with preexec_fn to close the fds we don't
+ # need to inherit
+ def close_read_end():
+ os.close(log_read_fd)
+ p = subprocess.Popen(
+ [filename] + args, cwd=cwd, env=new_env,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ preexec_fn=close_read_end)
+ os.close(log_write_fd)
+ log_write_fd = None
+
+ return self._watch_extension_subprocess(p, log_read_fd)
+ finally:
+ os.close(log_read_fd)
+ if log_write_fd is not None:
+ os.close(log_write_fd)
+
+ def _watch_extension_subprocess(self, p, log_read_fd):
+ '''Follow stdout, stderr and log output of an extension subprocess.'''
+
+ try:
+ socket_map = {}
+ for handler, fd in ((self._report_stdout, p.stdout),
+ (self._report_stderr, p.stderr),
+ (self._report_logger, log_read_fd)):
+ _OutputDispatcher(line_handler=handler, fd=fd,
+ map=socket_map)
+ asyncore.loop(use_poll=True, map=socket_map)
+
+ returncode = p.wait()
+ assert returncode is not None
+ except BaseException as e:
+ logging.debug('Received exception %r watching extension' % e)
+ p.terminate()
+ p.wait()
+ raise
+ finally:
+ p.stdout.close()
+ p.stderr.close()
+
+ return returncode