summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Maw <richard.maw@gmail.com>2014-08-29 23:18:46 +0100
committerSam Thursfield <sam@afuera.me.uk>2014-09-01 10:26:53 +0000
commit6fba43005c52930ce0175df18122264c2f193448 (patch)
treedb32213293d61774e37298064c4cdcb331a8d9ff
parent864dff6aa72615a85e3c7180cfd65cacb66867da (diff)
downloadmorph-6fba43005c52930ce0175df18122264c2f193448.tar.gz
deploy: Allow extensions to write to Morph log file
This is achieved by passing it the write end of a pipe, so that the extension has somewhere to write debug messages without clobbering either stdout or stderr. Previously deployment extensions could only display status messages on stdout, which is good for telling the user what is happening but is not useful when trying to do post-mortem debugging, when more information is usually required. This uses the asyncore asynchronous event framework, which is rather specific to sockets, but can be made to work with file descriptors, and has the advantage that it's part of the python standard library. It relies on polling file descriptors, but there's a trick with pipes to use them as a notification that a process has exited: 1. Ensure the write-end of the pipe is only open in the process you want to know when it exits 2. Make sure the pipe's FDs are set in non-blocking read/write mode 3. Call select or poll on the read-end of the file descriptor 4. If select/poll says you can read from that FD, but you get an EOF, then the process has closed it, and if you know it doesn't do that in normal operation, then the process has terminated. It took a few weird hacks to get the async_chat module to unregister its event handlers on EOF, but the result is an event loop that is asleep until the kernel tells it that it has to do something.
-rw-r--r--morphlib/extensions.py116
-rw-r--r--morphlib/plugins/deploy_plugin.py53
2 files changed, 143 insertions, 26 deletions
diff --git a/morphlib/extensions.py b/morphlib/extensions.py
index 55478418..af6ba279 100644
--- a/morphlib/extensions.py
+++ b/morphlib/extensions.py
@@ -13,14 +13,21 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import cliapp
-import morphlib
+import asyncore
+import asynchat
import glob
+import logging
import os
-import sysbranchdir
import stat
+import subprocess
import tempfile
+import cliapp
+
+import morphlib
+import sysbranchdir
+
+
class ExtensionError(morphlib.Error):
pass
@@ -149,3 +156,106 @@ class get_extension_filename():
def __exit__(self, type, value, trace):
if self.delete:
os.remove(self.ext_filename)
+
+
+class _EOFWrapper(asyncore.file_wrapper):
+ '''File object that reports when it hits EOF
+
+ The async_chat class doesn't notice that its input file has hit EOF,
+ so if we give it one of these instead, it will mark the chatter for
+ closiure and ensure any in-progress buffers are flushed.
+ '''
+ def __init__(self, dispatcher, fd):
+ self._dispatcher = dispatcher
+ asyncore.file_wrapper.__init__(self, fd)
+
+ def recv(self, *args):
+ data = asyncore.file_wrapper.recv(self, *args)
+ if not data:
+ self._dispatcher.close_when_done()
+ # ensure any unterminated data is flushed
+ return self._dispatcher.get_terminator()
+ return data
+
+
+class _OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher):
+ '''asyncore dispatcher that calls line_handler per line.'''
+ def __init__(self, fd, line_handler, map=None):
+ asynchat.async_chat.__init__(self, sock=None, map=map)
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.set_terminator('\n')
+ self._line_handler = line_handler
+ collect_incoming_data = asynchat.async_chat._collect_incoming_data
+ def set_file(self, fd):
+ self.socket = _EOFWrapper(self, fd)
+ self._fileno = self.socket.fileno()
+ self.add_channel()
+ def found_terminator(self):
+ self._line_handler(''.join(self.incoming))
+ self.incoming = []
+
+class ExtensionSubprocess(object):
+
+ def __init__(self, report_stdout, report_stderr, report_logger):
+ self._report_stdout = report_stdout
+ self._report_stderr = report_stderr
+ self._report_logger = report_logger
+
+ def run(self, filename, args, cwd, env):
+ '''Run an extension.
+
+ Anything written by the extension to stdout is passed to status(), thus
+ normally echoed to Morph's stdout. An extra FD is passed in the
+ environment variable MORPH_LOG_FD, and anything written here will be
+ included as debug messages in Morph's log file.
+
+ '''
+
+ log_read_fd, log_write_fd = os.pipe()
+
+ try:
+ new_env = env.copy()
+ new_env['MORPH_LOG_FD'] = str(log_write_fd)
+
+ # Because we don't have python 3.2's pass_fds, we have to
+ # play games with preexec_fn to close the fds we don't
+ # need to inherit
+ def close_read_end():
+ os.close(log_read_fd)
+ p = subprocess.Popen(
+ [filename] + args, cwd=cwd, env=new_env,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ preexec_fn=close_read_end)
+ os.close(log_write_fd)
+ log_write_fd = None
+
+ return self._watch_extension_subprocess(p, log_read_fd)
+ finally:
+ os.close(log_read_fd)
+ if log_write_fd is not None:
+ os.close(log_write_fd)
+
+ def _watch_extension_subprocess(self, p, log_read_fd):
+ '''Follow stdout, stderr and log output of an extension subprocess.'''
+
+ try:
+ socket_map = {}
+ for handler, fd in ((self._report_stdout, p.stdout),
+ (self._report_stderr, p.stderr),
+ (self._report_logger, log_read_fd)):
+ _OutputDispatcher(line_handler=handler, fd=fd,
+ map=socket_map)
+ asyncore.loop(use_poll=True, map=socket_map)
+
+ returncode = p.wait()
+ assert returncode is not None
+ except BaseException as e:
+ logging.debug('Received exception %r watching extension' % e)
+ p.terminate()
+ p.wait()
+ raise
+ finally:
+ p.stdout.close()
+ p.stderr.close()
+
+ return returncode
diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py
index f3e43fa8..a80079fa 100644
--- a/morphlib/plugins/deploy_plugin.py
+++ b/morphlib/plugins/deploy_plugin.py
@@ -14,18 +14,19 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import cliapp
-import contextlib
import json
+import logging
import os
import shutil
-import stat
+import sys
import tarfile
import tempfile
import uuid
+import cliapp
import morphlib
+
class DeployPlugin(cliapp.Plugin):
def enable(self):
@@ -540,33 +541,39 @@ class DeployPlugin(cliapp.Plugin):
self.app.status(msg='Cleaning up')
shutil.rmtree(deploy_private_tempdir)
+ def _report_extension_stdout(self, line):
+ self.app.status(msg=line.replace('%s', '%%'))
+ def _report_extension_stderr(self, error_list):
+ def cb(line):
+ error_list.append(line)
+ sys.stderr.write('%s\n' % line)
+ return cb
+ def _report_extension_logger(self, name, kind):
+ return lambda line: logging.debug('%s%s: %s', name, kind, line)
def _run_extension(self, gd, name, kind, args, env):
'''Run an extension.
-
+
The ``kind`` should be either ``.configure`` or ``.write``,
depending on the kind of extension that is sought.
-
+
The extension is found either in the git repository of the
system morphology (repo, ref), or with the Morph code.
-
+
'''
- with morphlib.extensions.get_extension_filename(
- name, kind) as ext_filename:
- self.app.status(msg='Running extension %(name)s%(kind)s',
- name=name, kind=kind)
- self.app.runcmd(
- [ext_filename] + args,
- ['sh',
- '-c',
- 'while read l; do echo `date "+%F %T"` "$1$l"; done',
- '-',
- '%s[%s]' % (self.app.status_prefix, name + kind)],
- cwd=gd.dirname, env=env, stdout=None, stderr=None)
-
- def _is_executable(self, filename):
- st = os.stat(filename)
- mask = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
- return (stat.S_IMODE(st.st_mode) & mask) != 0
+ error_list = []
+ with morphlib.extensions.get_extension_filename(name, kind) as fn:
+ ext = morphlib.extensions.ExtensionSubprocess(
+ report_stdout=self._report_extension_stdout,
+ report_stderr=self._report_extension_stderr(error_list),
+ report_logger=self._report_extension_logger(name, kind),
+ )
+ returncode = ext.run(fn, args, env=env, cwd=gd.dirname)
+ if returncode == 0:
+ logging.info('%s%s succeeded', name, kind)
+ else:
+ message = '%s%s failed with code %s: %s' % (
+ name, kind, returncode, '\n'.join(error_list))
+ raise cliapp.AppException(message)
def create_metadata(self, system_artifact, root_repo_dir, deployment_type,
location, env):