diff options
Diffstat (limited to 'morphlib/extensions.py')
-rw-r--r-- | morphlib/extensions.py | 116 |
1 files changed, 113 insertions, 3 deletions
diff --git a/morphlib/extensions.py b/morphlib/extensions.py index 55478418..af6ba279 100644 --- a/morphlib/extensions.py +++ b/morphlib/extensions.py @@ -13,14 +13,21 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import cliapp -import morphlib +import asyncore +import asynchat import glob +import logging import os -import sysbranchdir import stat +import subprocess import tempfile +import cliapp + +import morphlib +import sysbranchdir + + class ExtensionError(morphlib.Error): pass @@ -149,3 +156,106 @@ class get_extension_filename(): def __exit__(self, type, value, trace): if self.delete: os.remove(self.ext_filename) + + +class _EOFWrapper(asyncore.file_wrapper): + '''File object that reports when it hits EOF + + The async_chat class doesn't notice that its input file has hit EOF, + so if we give it one of these instead, it will mark the chatter for + closiure and ensure any in-progress buffers are flushed. + ''' + def __init__(self, dispatcher, fd): + self._dispatcher = dispatcher + asyncore.file_wrapper.__init__(self, fd) + + def recv(self, *args): + data = asyncore.file_wrapper.recv(self, *args) + if not data: + self._dispatcher.close_when_done() + # ensure any unterminated data is flushed + return self._dispatcher.get_terminator() + return data + + +class _OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher): + '''asyncore dispatcher that calls line_handler per line.''' + def __init__(self, fd, line_handler, map=None): + asynchat.async_chat.__init__(self, sock=None, map=map) + asyncore.file_dispatcher.__init__(self, fd=fd, map=map) + self.set_terminator('\n') + self._line_handler = line_handler + collect_incoming_data = asynchat.async_chat._collect_incoming_data + def set_file(self, fd): + self.socket = _EOFWrapper(self, fd) + self._fileno = self.socket.fileno() + self.add_channel() + def found_terminator(self): + self._line_handler(''.join(self.incoming)) + self.incoming = [] + +class ExtensionSubprocess(object): + + def __init__(self, report_stdout, report_stderr, report_logger): + self._report_stdout = report_stdout + self._report_stderr = report_stderr + self._report_logger = report_logger + + def run(self, filename, args, cwd, env): + '''Run an extension. + + Anything written by the extension to stdout is passed to status(), thus + normally echoed to Morph's stdout. An extra FD is passed in the + environment variable MORPH_LOG_FD, and anything written here will be + included as debug messages in Morph's log file. + + ''' + + log_read_fd, log_write_fd = os.pipe() + + try: + new_env = env.copy() + new_env['MORPH_LOG_FD'] = str(log_write_fd) + + # Because we don't have python 3.2's pass_fds, we have to + # play games with preexec_fn to close the fds we don't + # need to inherit + def close_read_end(): + os.close(log_read_fd) + p = subprocess.Popen( + [filename] + args, cwd=cwd, env=new_env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + preexec_fn=close_read_end) + os.close(log_write_fd) + log_write_fd = None + + return self._watch_extension_subprocess(p, log_read_fd) + finally: + os.close(log_read_fd) + if log_write_fd is not None: + os.close(log_write_fd) + + def _watch_extension_subprocess(self, p, log_read_fd): + '''Follow stdout, stderr and log output of an extension subprocess.''' + + try: + socket_map = {} + for handler, fd in ((self._report_stdout, p.stdout), + (self._report_stderr, p.stderr), + (self._report_logger, log_read_fd)): + _OutputDispatcher(line_handler=handler, fd=fd, + map=socket_map) + asyncore.loop(use_poll=True, map=socket_map) + + returncode = p.wait() + assert returncode is not None + except BaseException as e: + logging.debug('Received exception %r watching extension' % e) + p.terminate() + p.wait() + raise + finally: + p.stdout.close() + p.stderr.close() + + return returncode |