diff options
-rw-r--r-- | morphlib/plugins/deploy_plugin.py | 123 |
1 files changed, 102 insertions, 21 deletions
diff --git a/morphlib/plugins/deploy_plugin.py b/morphlib/plugins/deploy_plugin.py index f3e43fa8..2ac98743 100644 --- a/morphlib/plugins/deploy_plugin.py +++ b/morphlib/plugins/deploy_plugin.py @@ -14,18 +14,23 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import cliapp -import contextlib +import errno +import fcntl import json +import logging import os import shutil -import stat +import subprocess +import sys import tarfile import tempfile +import time import uuid +import cliapp import morphlib + class DeployPlugin(cliapp.Plugin): def enable(self): @@ -540,33 +545,109 @@ class DeployPlugin(cliapp.Plugin): self.app.status(msg='Cleaning up') shutil.rmtree(deploy_private_tempdir) + def _set_nonblocking(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + def _run_extension(self, gd, name, kind, args, env): '''Run an extension. - + The ``kind`` should be either ``.configure`` or ``.write``, depending on the kind of extension that is sought. - + The extension is found either in the git repository of the system morphology (repo, ref), or with the Morph code. - + + Anything written by the extension to stdout is passed to status(), thus + normally echoed to Morph's stdout. An extra FD is passed in the + environment variable MORPH_LOG_FD, and anything written here will be + included as debug messages in Morph's log file. + ''' + with morphlib.extensions.get_extension_filename( name, kind) as ext_filename: - self.app.status(msg='Running extension %(name)s%(kind)s', - name=name, kind=kind) - self.app.runcmd( - [ext_filename] + args, - ['sh', - '-c', - 'while read l; do echo `date "+%F %T"` "$1$l"; done', - '-', - '%s[%s]' % (self.app.status_prefix, name + kind)], - cwd=gd.dirname, env=env, stdout=None, stderr=None) - - def _is_executable(self, filename): - st = os.stat(filename) - mask = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH - return (stat.S_IMODE(st.st_mode) & mask) != 0 + + log_read_fd, log_write_fd = os.pipe() + + try: + new_env = env.copy() + new_env['MORPH_LOG_FD'] = str(log_write_fd) + + p = subprocess.Popen( + [ext_filename] + args, cwd=gd.dirname, env=new_env, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + for fd in [log_read_fd, log_write_fd, p.stderr.fileno(), + p.stdout.fileno()]: + self._set_nonblocking(fd) + + self._watch_extension_subprocess(name, kind, p, log_read_fd) + finally: + os.close(log_read_fd) + os.close(log_write_fd) + + def _read_lines_nonblocking(self, fd): + # We must use os.read() directly here: see + # <http://bugs.python.org/issue1175#msg56041> + reads = [] + while True: + try: + data = os.read(fd, 1024) + if len(data) == 0: + break + reads.append(data) + except OSError as e: + if e.errno == errno.EWOULDBLOCK: + break + raise e + if len(reads) == 0: + return [] + else: + full_text = ''.join(reads) + lines = full_text.rstrip().split('\n') + return lines + + def _watch_extension_subprocess(self, name, kind, p, log_read_fd): + '''Follow stdout, stderr and log output of an extension subprocess.''' + error = [] + + try: + while True: + def escape(line): + return line.replace('%', '%%') + + for line in self._read_lines_nonblocking(p.stdout.fileno()): + self.app.status(msg=escape(line.rstrip())) + + for line in self._read_lines_nonblocking(p.stderr.fileno()): + error.append(line) + sys.stderr.write(line) + sys.stderr.write('\n') + + for line in self._read_lines_nonblocking(log_read_fd): + logging.debug('%s%s: %s', name, kind, line.rstrip()) + + if p.poll() is not None: + break + + time.sleep(0.2) + except BaseException as e: + logging.debug('Received exception %r watching extension' % e) + p.terminate() + p.wait() + raise + finally: + p.stdout.close() + p.stderr.close() + + if p.returncode == 0: + logging.info('%s%s succeeded', name, kind) + else: + message = '%s%s failed with code %s: %s' % ( + name, kind, p.returncode, '\n'.join(error)) + raise cliapp.AppException(message) def create_metadata(self, system_artifact, root_repo_dir, deployment_type, location, env): |