summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--morphlib/extensions.py116
-rw-r--r--morphlib/plugins/deploy_plugin.py53
2 files changed, 143 insertions, 26 deletions
diff --git a/morphlib/extensions.py b/morphlib/extensions.py
index 5547841..af6ba27 100644
--- a/morphlib/extensions.py
+++ b/morphlib/extensions.py
@@ -13,14 +13,21 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import cliapp
-import morphlib
+import asyncore
+import asynchat
import glob
+import logging
import os
-import sysbranchdir
import stat
+import subprocess
import tempfile
+import cliapp
+
+import morphlib
+import sysbranchdir
+
+
class ExtensionError(morphlib.Error):
pass
@@ -149,3 +156,106 @@ class get_extension_filename():
def __exit__(self, type, value, trace):
if self.delete:
os.remove(self.ext_filename)
+
+
+class _EOFWrapper(asyncore.file_wrapper):
+ '''File object that reports when it hits EOF
+
+ The async_chat class doesn't notice that its input file has hit EOF,
+ so if we give it one of these instead, it will mark the chatter for
+ closiure and ensure any in-progress buffers are flushed.
+ '''
+ def __init__(self, dispatcher, fd):
+ self._dispatcher = dispatcher
+ asyncore.file_wrapper.__init__(self, fd)
+
+ def recv(self, *args):
+ data = asyncore.file_wrapper.recv(self, *args)
+ if not data:
+ self._dispatcher.close_when_done()
+ # ensure any unterminated data is flushed
+ return self._dispatcher.get_terminator()
+ return data
+
+
+class _OutputDispatcher(asynchat.async_chat, asyncore.file_dispatcher):
+ '''asyncore dispatcher that calls line_handler per line.'''
+ def __init__(self, fd, line_handler, map=None):
+ asynchat.async_chat.__init__(self, sock=None, map=map)
+ asyncore.file_dispatcher.__init__(self, fd=fd, map=map)
+ self.set_terminator('\n')
+ self._line_handler = line_handler
+ collect_incoming_data = asynchat.async_chat._collect_incoming_data
+ def set_file(self, fd):
+ self.socket = _EOFWrapper(self, fd)
+ self._fileno = self.socket.fileno()
+ self.add_channel()
+ def found_terminator(self):
+ self._line_handler(''.join(self.incoming))
+ self.incoming = []
+
+class ExtensionSubprocess(object):
+
+ def __init__(self, report_stdout, report_stderr, report_logger):
+ self._report_stdout = report_stdout
+ self._report_stderr = report_stderr
+ self._report_logger = report_logger
+
+ def run(self, filename, args, cwd, env):
+ '''Run an extension.
+
+ Anything written by the extension to stdout is passed to status(), thus
+ normally echoed to Morph's stdout. An extra FD is passed in the
+ environment variable MORPH_LOG_FD, and anything written here will be
+ included as debug messages in Morph's log file.
+
+ '''
+
+ log_read_fd, log_write_fd = os.pipe()
+
+ try:
+ new_env = env.copy()
+ new_env['MORPH_LOG_FD'] = str(log_write_fd)
+
+ # Because we don't have python 3.2's pass_fds, we have to
+ # play games with preexec_fn to close the fds we don't
+ # need to inherit
+ def close_read_end():
+ os.close(log_read_fd)
+ p = subprocess.Popen(
+ [filename] + args, cwd=cwd, env=new_env,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ preexec_fn=close_read_end)
+ os.close(log_write_fd)
+ log_write_fd = None
+
+ return self._watch_extension_subprocess(p, log_read_fd)
+ finally:
+ os.close(log_read_fd)
+ if log_write_fd is not None:
+ os.close(log_write_fd)
+
+ def _watch_extension_subprocess(self, p, log_read_fd):
+ '''Follow stdout, stderr and log output of an extension subprocess.'''
+
+ try:
+ socket_map = {}
+ for handler, fd in ((self._report_stdout, p.stdout),
+ (self._report_stderr, p.stderr),
+ (self._report_logger, log_read_fd)):
+ _OutputDispatcher(line_handler=handler, fd=fd,
+ map=socket_map)
+ asyncore.loop(use_poll=True, map=socket_map)
+
+ returncode = p.wait()
+ assert returncode is not None
+ except BaseException as e:
+ logging.debug('Received exception %r watching extension' % e)
+ p.terminate()
+ p.wait()
+ raise
+ finally:
+ p.stdout.close()
+ p.stderr.close()
+
+ return returncode
diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py
index f3e43fa..a80079f 100644
--- a/morphlib/plugins/deploy_plugin.py
+++ b/morphlib/plugins/deploy_plugin.py
@@ -14,18 +14,19 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import cliapp
-import contextlib
import json
+import logging
import os
import shutil
-import stat
+import sys
import tarfile
import tempfile
import uuid
+import cliapp
import morphlib
+
class DeployPlugin(cliapp.Plugin):
def enable(self):
@@ -540,33 +541,39 @@ class DeployPlugin(cliapp.Plugin):
self.app.status(msg='Cleaning up')
shutil.rmtree(deploy_private_tempdir)
+ def _report_extension_stdout(self, line):
+ self.app.status(msg=line.replace('%s', '%%'))
+ def _report_extension_stderr(self, error_list):
+ def cb(line):
+ error_list.append(line)
+ sys.stderr.write('%s\n' % line)
+ return cb
+ def _report_extension_logger(self, name, kind):
+ return lambda line: logging.debug('%s%s: %s', name, kind, line)
def _run_extension(self, gd, name, kind, args, env):
'''Run an extension.
-
+
The ``kind`` should be either ``.configure`` or ``.write``,
depending on the kind of extension that is sought.
-
+
The extension is found either in the git repository of the
system morphology (repo, ref), or with the Morph code.
-
+
'''
- with morphlib.extensions.get_extension_filename(
- name, kind) as ext_filename:
- self.app.status(msg='Running extension %(name)s%(kind)s',
- name=name, kind=kind)
- self.app.runcmd(
- [ext_filename] + args,
- ['sh',
- '-c',
- 'while read l; do echo `date "+%F %T"` "$1$l"; done',
- '-',
- '%s[%s]' % (self.app.status_prefix, name + kind)],
- cwd=gd.dirname, env=env, stdout=None, stderr=None)
-
- def _is_executable(self, filename):
- st = os.stat(filename)
- mask = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
- return (stat.S_IMODE(st.st_mode) & mask) != 0
+ error_list = []
+ with morphlib.extensions.get_extension_filename(name, kind) as fn:
+ ext = morphlib.extensions.ExtensionSubprocess(
+ report_stdout=self._report_extension_stdout,
+ report_stderr=self._report_extension_stderr(error_list),
+ report_logger=self._report_extension_logger(name, kind),
+ )
+ returncode = ext.run(fn, args, env=env, cwd=gd.dirname)
+ if returncode == 0:
+ logging.info('%s%s succeeded', name, kind)
+ else:
+ message = '%s%s failed with code %s: %s' % (
+ name, kind, returncode, '\n'.join(error_list))
+ raise cliapp.AppException(message)
def create_metadata(self, system_artifact, root_repo_dir, deployment_type,
location, env):