diff options
-rw-r--r-- | morphlib/extensions.py | 116 | ||||
-rw-r--r-- | morphlib/plugins/deploy_plugin.py | 53 |
2 files changed, 143 insertions, 26 deletions
diff --git a/morphlib/extensions.py b/morphlib/extensions.py index 55478418..af6ba279 100644 --- a/morphlib/extensions.py +++ b/morphlib/extensions.py @@ -13,14 +13,21 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import cliapp -import morphlib +import asyncore +import asynchat import glob +import logging import os -import sysbranchdir import stat +import subprocess import tempfile +import cliapp + +import morphlib +import sysbranchdir + + class ExtensionError(morphlib.Error): pass @@ -149,3 +156,106 @@ class get_extension_filename(): def __exit__(self, type, value, trace): if self.delete: os.remove(self.ext_filename) + + +class _EOFWrapper(asyncore.file_wrapper): + '''File object that reports when it hits EOF + + The async_chat class doesn't notice that its input file has hit EOF, + so if we give it one of these instead, it will mark the chatter for + closiure and ensure any in-progress buffers are flushed. + ''' + def __init__(self, dispatcher, fd): + self._dispatcher = dispatcher + asyncore.file_wrapper.__init__(self, fd) + + def recv(self, *args): + data = asyncore.file_wrapper.recv(self, *args) + if not data: + self._dispatcher.close_when_done() + # ensure any unterminated data is flushed + return self._dispatcher.get_terminator() + return data + + +class _OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher): + '''asyncore dispatcher that calls line_handler per line.''' + def __init__(self, fd, line_handler, map=None): + asynchat.async_chat.__init__(self, sock=None, map=map) + asyncore.file_dispatcher.__init__(self, fd=fd, map=map) + self.set_terminator('\n') + self._line_handler = line_handler + collect_incoming_data = asynchat.async_chat._collect_incoming_data + def set_file(self, fd): + self.socket = _EOFWrapper(self, fd) + self._fileno = self.socket.fileno() + self.add_channel() + def found_terminator(self): + self._line_handler(''.join(self.incoming)) + self.incoming = [] + +class ExtensionSubprocess(object): + + def __init__(self, report_stdout, report_stderr, report_logger): + self._report_stdout = report_stdout + self._report_stderr = report_stderr + self._report_logger = report_logger + + def run(self, filename, args, cwd, env): + '''Run an extension. + + Anything written by the extension to stdout is passed to status(), thus + normally echoed to Morph's stdout. An extra FD is passed in the + environment variable MORPH_LOG_FD, and anything written here will be + included as debug messages in Morph's log file. + + ''' + + log_read_fd, log_write_fd = os.pipe() + + try: + new_env = env.copy() + new_env['MORPH_LOG_FD'] = str(log_write_fd) + + # Because we don't have python 3.2's pass_fds, we have to + # play games with preexec_fn to close the fds we don't + # need to inherit + def close_read_end(): + os.close(log_read_fd) + p = subprocess.Popen( + [filename] + args, cwd=cwd, env=new_env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + preexec_fn=close_read_end) + os.close(log_write_fd) + log_write_fd = None + + return self._watch_extension_subprocess(p, log_read_fd) + finally: + os.close(log_read_fd) + if log_write_fd is not None: + os.close(log_write_fd) + + def _watch_extension_subprocess(self, p, log_read_fd): + '''Follow stdout, stderr and log output of an extension subprocess.''' + + try: + socket_map = {} + for handler, fd in ((self._report_stdout, p.stdout), + (self._report_stderr, p.stderr), + (self._report_logger, log_read_fd)): + _OutputDispatcher(line_handler=handler, fd=fd, + map=socket_map) + asyncore.loop(use_poll=True, map=socket_map) + + returncode = p.wait() + assert returncode is not None + except BaseException as e: + logging.debug('Received exception %r watching extension' % e) + p.terminate() + p.wait() + raise + finally: + p.stdout.close() + p.stderr.close() + + return returncode diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py index f3e43fa8..a80079fa 100644 --- a/morphlib/plugins/deploy_plugin.py +++ b/morphlib/plugins/deploy_plugin.py @@ -14,18 +14,19 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import cliapp -import contextlib import json +import logging import os import shutil -import stat +import sys import tarfile import tempfile import uuid +import cliapp import morphlib + class DeployPlugin(cliapp.Plugin): def enable(self): @@ -540,33 +541,39 @@ class DeployPlugin(cliapp.Plugin): self.app.status(msg='Cleaning up') shutil.rmtree(deploy_private_tempdir) + def _report_extension_stdout(self, line): + self.app.status(msg=line.replace('%s', '%%')) + def _report_extension_stderr(self, error_list): + def cb(line): + error_list.append(line) + sys.stderr.write('%s\n' % line) + return cb + def _report_extension_logger(self, name, kind): + return lambda line: logging.debug('%s%s: %s', name, kind, line) def _run_extension(self, gd, name, kind, args, env): '''Run an extension. - + The ``kind`` should be either ``.configure`` or ``.write``, depending on the kind of extension that is sought. - + The extension is found either in the git repository of the system morphology (repo, ref), or with the Morph code. - + ''' - with morphlib.extensions.get_extension_filename( - name, kind) as ext_filename: - self.app.status(msg='Running extension %(name)s%(kind)s', - name=name, kind=kind) - self.app.runcmd( - [ext_filename] + args, - ['sh', - '-c', - 'while read l; do echo `date "+%F %T"` "$1$l"; done', - '-', - '%s[%s]' % (self.app.status_prefix, name + kind)], - cwd=gd.dirname, env=env, stdout=None, stderr=None) - - def _is_executable(self, filename): - st = os.stat(filename) - mask = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH - return (stat.S_IMODE(st.st_mode) & mask) != 0 + error_list = [] + with morphlib.extensions.get_extension_filename(name, kind) as fn: + ext = morphlib.extensions.ExtensionSubprocess( + report_stdout=self._report_extension_stdout, + report_stderr=self._report_extension_stderr(error_list), + report_logger=self._report_extension_logger(name, kind), + ) + returncode = ext.run(fn, args, env=env, cwd=gd.dirname) + if returncode == 0: + logging.info('%s%s succeeded', name, kind) + else: + message = '%s%s failed with code %s: %s' % ( + name, kind, returncode, '\n'.join(error_list)) + raise cliapp.AppException(message) def create_metadata(self, system_artifact, root_repo_dir, deployment_type, location, env): |