From ea5a6365888118075216b164a1498b3c4ebe88f5 Mon Sep 17 00:00:00 2001 From: Richard Ipsum Date: Mon, 17 Mar 2014 13:43:20 +0000 Subject: Add distbuild-helper --- distbuild-helper | 322 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100755 distbuild-helper diff --git a/distbuild-helper b/distbuild-helper new file mode 100755 index 00000000..9d70642e --- /dev/null +++ b/distbuild-helper @@ -0,0 +1,322 @@ +#!/usr/bin/python +# +# distbuild-helper -- helper process for Morph distributed building +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. + +import cliapp +import errno +import fcntl +import httplib +import logging +import os +import signal +import socket +import subprocess +import sys +import time +import urlparse + +import distbuild + + +class FileReadable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class FileWriteable(object): + + def __init__(self, request_id, p, f): + self.request_id = request_id + self.process = p + self.file = f + + +class SubprocessEventSource(distbuild.EventSource): + + def __init__(self): + self.procs = [] + self.closed = False + + def get_select_params(self): + r = [] + w = [] + for requst_id, p in self.procs: + if p.stdin_contents is not None: + w.append(p.stdin) + if p.stdout is not None: + r.append(p.stdout) + if p.stderr is not None: + r.append(p.stderr) + return r, w, [], None + + def get_events(self, r, w, x): + events = [] + + for request_id, p in self.procs: + if p.stdin in w: + events.append(FileWriteable(request_id, p, p.stdin)) + if p.stdout in r: + events.append(FileReadable(request_id, p, p.stdout)) + if p.stderr in r: + events.append(FileReadable(request_id, p, p.stderr)) + + return events + + def add(self, request_id, process): + + self.procs.append((request_id, process)) + distbuild.set_nonblocking(process.stdin) + distbuild.set_nonblocking(process.stdout) + distbuild.set_nonblocking(process.stderr) + + def remove(self, process): + self.procs = [t for t in self.procs if t[1] != process] + + def kill_by_id(self, request_id): + logging.debug('SES: Killing all processes for %s', request_id) + for id, process in self.procs: + if id == request_id: + logging.debug('SES: killing %s', repr(process)) + process.kill() + + def close(self): + self.procs = [] + self.closed = True + + def is_finished(self): + return self.closed + + +class HelperMachine(distbuild.StateMachine): + + def __init__(self, conn): + distbuild.StateMachine.__init__(self, 'waiting') + self.conn = conn + self.debug_messages = False + + def setup(self): + distbuild.crash_point() + + jm = self.jm = distbuild.JsonMachine(self.conn) + self.mainloop.add_state_machine(jm) + + p = self.procsrc = SubprocessEventSource() + self.mainloop.add_event_source(p) + + self.send_helper_ready(jm) + + spec = [ + ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), + ('waiting', jm, distbuild.JsonEof, None, self._eofed), + ('waiting', p, FileReadable, 'waiting', self._relay_exec_output), + ('waiting', p, FileWriteable, 'waiting', self._feed_stdin), + ] + self.add_transitions(spec) + + def send_helper_ready(self, jm): + msg = { + 'type': 'helper-ready', + } + jm.send(msg) + logging.debug('HelperMachine: sent: %s', repr(msg)) + + def do(self, parent, event): + distbuild.crash_point() + + logging.debug('JsonMachine: got: %s', repr(event.msg)) + handlers = { + 'http-request': self.do_http_request, + 'exec-request': self.do_exec_request, + 'exec-cancel': self.do_exec_cancel, + } + handler = handlers.get(event.msg['type']) + handler(parent, event.msg) + + def do_http_request(self, parent, msg): + distbuild.crash_point() + + url = msg['url'] + method = msg['method'] + assert method in ('HEAD', 'GET') + + logging.debug('JsonMachine: http request: %s %s' % (method, url)) + + schema, netloc, path, query, fragment = urlparse.urlsplit(url) + assert schema == 'http' + if query: + path += '?' + query + + try: + conn = httplib.HTTPConnection(netloc) + conn.request(method, path) + except (socket.error, httplib.HTTPException), e: + status = 418 # teapot + data = str(e) + else: + res = conn.getresponse() + status = res.status + data = res.read() + conn.close() + + response = { + 'type': 'http-response', + 'id': msg['id'], + 'status': status, + 'body': data, + } + parent.send(response) + logging.debug('JsonMachine: sent to parent: %s', repr(response)) + self.send_helper_ready(parent) + + def do_exec_request(self, parent, msg): + distbuild.crash_point() + + argv = msg['argv'] + stdin_contents = msg.get('stdin_contents', '') + logging.debug('JsonMachine: exec request: argv=%s', repr(argv)) + logging.debug( + 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) + + p = subprocess.Popen(argv, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + p.stdin_contents = stdin_contents + + self.procsrc.add(msg['id'], p) + + def do_exec_cancel(self, parent, msg): + distbuild.crash_point() + + self.procsrc.kill_by_id(msg['id']) + + def _relay_exec_output(self, event_source, event): + distbuild.crash_point() + + buf_size = 64 + fd = event.file.fileno() + data = os.read(fd, buf_size) + if data: + if event.file == event.process.stdout: + stream = 'stdout' + other = 'stderr' + else: + stream = 'stderr' + other = 'stdout' + msg = { + 'type': 'exec-output', + 'id': event.request_id, + stream: data, + other: '', + } + logging.debug('JsonMachine: sent to parent: %s', repr(msg)) + self.jm.send(msg) + else: + if event.file == event.process.stdout: + event.process.stdout.close() + event.process.stdout = None + else: + event.process.stderr.close() + event.process.stderr = None + + if event.process.stdout == event.process.stderr == None: + event.process.wait() + self.procsrc.remove(event.process) + msg = { + 'type': 'exec-response', + 'id': event.request_id, + 'exit': event.process.returncode, + } + logging.debug('JsonMachine: sent to parent: %s', repr(msg)) + self.jm.send(msg) + self.send_helper_ready(self.jm) + + def _feed_stdin(self, event_source, event): + distbuild.crash_point() + + fd = event.file.fileno() + try: + n = os.write(fd, event.process.stdin_contents) + except os.error, e: + # If other end closed the read end, stop writing. + if e.errno == errno.EPIPE: + logging.debug('JsonMachine: reader closed pipe') + event.process.stdin_contents = '' + else: + raise + else: + logging.debug('JsonMachine: fed %d bytes to stdin', n) + event.process.stdin_contents = event.process.stdin_contents[n:] + if event.process.stdin_contents == '': + logging.debug('JsonMachine: stdin contents finished, closing') + event.file.close() + event.process.stdin_contents = None + + def _eofed(self, event_source, event): + distbuild.crash_point() + logging.info('eof from parent, closing') + event_source.close() + self.procsrc.close() + + +class DistributedBuildHelper(cliapp.Application): + + def add_settings(self): + self.settings.string( + ['parent-address'], + 'address (hostname/ip address) for parent', + metavar='HOSTNAME', + default='localhost') + self.settings.integer( + ['parent-port'], + 'port number for parent', + metavar='PORT', + default=3434) + self.settings.boolean( + ['debug-messages'], + 'log messages that are received?') + self.settings.string_list( + ['crash-condition'], + 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' + '(this is for testing only)', + metavar='FILENAME:FUNCNAME:MAXCALLS') + + def process_args(self, args): + distbuild.add_crash_conditions(self.settings['crash-condition']) + + # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE + # instead. + signal.signal(signal.SIGPIPE, signal.SIG_IGN) + + addr = self.settings['parent-address'] + port = self.settings['parent-port'] + conn = socket.socket() + conn.connect((addr, port)) + helper = HelperMachine(conn) + helper.debug_messages = self.settings['debug-messages'] + loop = distbuild.MainLoop() + loop.add_state_machine(helper) + loop.run() + + +DistributedBuildHelper().run() + -- cgit v1.2.1