#!/usr/bin/python # # distbuild-helper -- helper process for Morph distributed building # # Copyright (C) 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. import cliapp import errno import fcntl import httplib import logging import os import signal import socket import subprocess import sys import time import urlparse import distbuild class FileReadable(object): def __init__(self, request_id, p, f): self.request_id = request_id self.process = p self.file = f class FileWriteable(object): def __init__(self, request_id, p, f): self.request_id = request_id self.process = p self.file = f class SubprocessEventSource(distbuild.EventSource): def __init__(self): self.procs = [] self.closed = False def get_select_params(self): r = [] w = [] for requst_id, p in self.procs: if p.stdin_contents is not None: w.append(p.stdin) if p.stdout is not None: r.append(p.stdout) if p.stderr is not None: r.append(p.stderr) return r, w, [], None def get_events(self, r, w, x): events = [] for request_id, p in self.procs: if p.stdin in w: events.append(FileWriteable(request_id, p, p.stdin)) if p.stdout in r: events.append(FileReadable(request_id, p, p.stdout)) if p.stderr in r: events.append(FileReadable(request_id, p, p.stderr)) return events def add(self, request_id, process): self.procs.append((request_id, process)) distbuild.set_nonblocking(process.stdin) distbuild.set_nonblocking(process.stdout) distbuild.set_nonblocking(process.stderr) def remove(self, process): self.procs = [t for t in self.procs if t[1] != process] def kill_by_id(self, request_id): logging.debug('SES: Killing all processes for %s', request_id) for id, process in self.procs: if id == request_id: logging.debug('SES: killing %s', repr(process)) process.kill() def close(self): self.procs = [] self.closed = True def is_finished(self): return self.closed class HelperMachine(distbuild.StateMachine): def __init__(self, conn): distbuild.StateMachine.__init__(self, 'waiting') self.conn = conn self.debug_messages = False def setup(self): distbuild.crash_point() jm = self.jm = distbuild.JsonMachine(self.conn) self.mainloop.add_state_machine(jm) p = self.procsrc = SubprocessEventSource() self.mainloop.add_event_source(p) self.send_helper_ready(jm) spec = [ ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), ('waiting', jm, distbuild.JsonEof, None, self._eofed), ('waiting', p, FileReadable, 'waiting', self._relay_exec_output), ('waiting', p, FileWriteable, 'waiting', self._feed_stdin), ] self.add_transitions(spec) def send_helper_ready(self, jm): msg = { 'type': 'helper-ready', } jm.send(msg) logging.debug('HelperMachine: sent: %s', repr(msg)) def do(self, parent, event): distbuild.crash_point() logging.debug('JsonMachine: got: %s', repr(event.msg)) handlers = { 'http-request': self.do_http_request, 'exec-request': self.do_exec_request, 'exec-cancel': self.do_exec_cancel, } handler = handlers.get(event.msg['type']) handler(parent, event.msg) def do_http_request(self, parent, msg): distbuild.crash_point() url = msg['url'] method = msg['method'] headers = msg['headers'] body = msg['body'] assert method in ('HEAD', 'GET', 'POST') logging.debug('JsonMachine: http request: %s %s' % (method, url)) schema, netloc, path, query, fragment = urlparse.urlsplit(url) assert schema == 'http' if query: path += '?' + query try: conn = httplib.HTTPConnection(netloc) if headers: conn.request(method, path, body, headers) else: conn.request(method, path, body) except (socket.error, httplib.HTTPException), e: status = 418 # teapot data = str(e) else: res = conn.getresponse() status = res.status data = res.read() conn.close() response = { 'type': 'http-response', 'id': msg['id'], 'status': status, 'body': data, } parent.send(response) logging.debug('JsonMachine: sent to parent: %s', repr(response)) self.send_helper_ready(parent) def do_exec_request(self, parent, msg): distbuild.crash_point() argv = msg['argv'] stdin_contents = msg.get('stdin_contents', '') logging.debug('JsonMachine: exec request: argv=%s', repr(argv)) logging.debug( 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) p = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.stdin_contents = stdin_contents self.procsrc.add(msg['id'], p) def do_exec_cancel(self, parent, msg): distbuild.crash_point() self.procsrc.kill_by_id(msg['id']) def _relay_exec_output(self, event_source, event): distbuild.crash_point() buf_size = 16 * 1024 fd = event.file.fileno() data = os.read(fd, buf_size) if data: if event.file == event.process.stdout: stream = 'stdout' other = 'stderr' else: stream = 'stderr' other = 'stdout' msg = { 'type': 'exec-output', 'id': event.request_id, stream: data, other: '', } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) else: if event.file == event.process.stdout: event.process.stdout.close() event.process.stdout = None else: event.process.stderr.close() event.process.stderr = None if event.process.stdout == event.process.stderr == None: event.process.wait() self.procsrc.remove(event.process) msg = { 'type': 'exec-response', 'id': event.request_id, 'exit': event.process.returncode, } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) self.send_helper_ready(self.jm) def _feed_stdin(self, event_source, event): distbuild.crash_point() fd = event.file.fileno() try: n = os.write(fd, event.process.stdin_contents) except os.error, e: # If other end closed the read end, stop writing. if e.errno == errno.EPIPE: logging.debug('JsonMachine: reader closed pipe') event.process.stdin_contents = '' else: raise else: logging.debug('JsonMachine: fed %d bytes to stdin', n) event.process.stdin_contents = event.process.stdin_contents[n:] if event.process.stdin_contents == '': logging.debug('JsonMachine: stdin contents finished, closing') event.file.close() event.process.stdin_contents = None def _eofed(self, event_source, event): distbuild.crash_point() logging.info('eof from parent, closing') event_source.close() self.procsrc.close() class DistributedBuildHelper(cliapp.Application): def add_settings(self): self.settings.string( ['parent-address'], 'address (hostname/ip address) for parent', metavar='HOSTNAME', default='localhost') self.settings.integer( ['parent-port'], 'port number for parent', metavar='PORT', default=3434) self.settings.boolean( ['debug-messages'], 'log messages that are received?') self.settings.string_list( ['crash-condition'], 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' '(this is for testing only)', metavar='FILENAME:FUNCNAME:MAXCALLS') def process_args(self, args): distbuild.add_crash_conditions(self.settings['crash-condition']) # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE # instead. signal.signal(signal.SIGPIPE, signal.SIG_IGN) addr = self.settings['parent-address'] port = self.settings['parent-port'] conn = distbuild.create_socket() conn.connect((addr, port)) helper = HelperMachine(conn) helper.debug_messages = self.settings['debug-messages'] loop = distbuild.MainLoop() loop.add_state_machine(helper) loop.run() DistributedBuildHelper().run()