#!/usr/bin/python # # distbuild-helper -- helper process for Morph distributed building # # Copyright (C) 2014 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.. import base64 import cliapp import errno import fcntl import httplib import logging import os import signal import socket import subprocess import sys import time import urlparse import distbuild class FileReadable(object): def __init__(self, request_id, p, f): self.request_id = request_id self.process = p self.file = f class FileWriteable(object): def __init__(self, request_id, p, f): self.request_id = request_id self.process = p self.file = f class SubprocessEventSource(distbuild.EventSource): def __init__(self): self.procs = [] self.closed = False def get_select_params(self): r = [] w = [] for requst_id, p in self.procs: if p.stdin_contents is not None: w.append(p.stdin) if p.stdout is not None: r.append(p.stdout) if p.stderr is not None: r.append(p.stderr) return r, w, [], None def get_events(self, r, w, x): events = [] for request_id, p in self.procs: if p.stdin in w: events.append(FileWriteable(request_id, p, p.stdin)) if p.stdout in r: events.append(FileReadable(request_id, p, p.stdout)) if p.stderr in r: events.append(FileReadable(request_id, p, p.stderr)) return events def add(self, request_id, process): self.procs.append((request_id, process)) distbuild.set_nonblocking(process.stdin) distbuild.set_nonblocking(process.stdout) distbuild.set_nonblocking(process.stderr) def remove(self, process): self.procs = [t for t in self.procs if t[1] != process] def kill_by_id(self, request_id): logging.debug('SES: Killing all processes for %s', request_id) for id, process in self.procs: if id == request_id: logging.debug('SES: killing %s', repr(process)) process.kill() def close(self): self.procs = [] self.closed = True def is_finished(self): return self.closed class HelperMachine(distbuild.StateMachine): def __init__(self, conn): distbuild.StateMachine.__init__(self, 'waiting') self.conn = conn self.debug_messages = False def setup(self): distbuild.crash_point() jm = self.jm = distbuild.JsonMachine(self.conn) self.mainloop.add_state_machine(jm) p = self.procsrc = SubprocessEventSource() self.mainloop.add_event_source(p) self.send_helper_ready(jm) spec = [ ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), ('waiting', jm, distbuild.JsonEof, None, self._eofed), ('waiting', p, FileReadable, 'waiting', self._relay_exec_output), ('waiting', p, FileWriteable, 'waiting', self._feed_stdin), ] self.add_transitions(spec) def send_helper_ready(self, jm): msg = { 'type': 'helper-ready', } jm.send(msg) logging.debug('HelperMachine: sent: %s', repr(msg)) def do(self, parent, event): distbuild.crash_point() logging.debug('JsonMachine: got: %s', repr(event.msg)) handlers = { 'http-request': self.do_http_request, 'exec-request': self.do_exec_request, 'exec-cancel': self.do_exec_cancel, } handler = handlers.get(event.msg['type']) handler(parent, event.msg) def do_http_request(self, parent, msg): distbuild.crash_point() url = msg['url'] method = msg['method'] headers = msg['headers'] body = msg['body'] assert method in ('HEAD', 'GET', 'POST') logging.debug('JsonMachine: http request: %s %s' % (method, url)) schema, netloc, path, query, fragment = urlparse.urlsplit(url) assert schema == 'http' if query: path += '?' + query try: conn = httplib.HTTPConnection(netloc) if headers: conn.request(method, path, body, headers) else: conn.request(method, path, body) except (socket.error, httplib.HTTPException), e: status = 418 # teapot data = str(e) else: res = conn.getresponse() status = res.status data = res.read() conn.close() response = { 'type': 'http-response', 'id': msg['id'], 'status': status, 'body': data, } parent.send(response) logging.debug('JsonMachine: sent to parent: %s', repr(response)) self.send_helper_ready(parent) def do_exec_request(self, parent, msg): distbuild.crash_point() argv = msg['argv'] stdin_contents = msg.get('stdin_contents', '') logging.debug('JsonMachine: exec request: argv=%s', repr(argv)) logging.debug( 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) p = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.stdin_contents = stdin_contents self.procsrc.add(msg['id'], p) def do_exec_cancel(self, parent, msg): distbuild.crash_point() self.procsrc.kill_by_id(msg['id']) def _relay_exec_output(self, event_source, event): distbuild.crash_point() buf_size = 16 * 1024 fd = event.file.fileno() data = os.read(fd, buf_size) if data: if event.file == event.process.stdout: stream = 'stdout' other = 'stderr' else: stream = 'stderr' other = 'stdout' msg = { 'type': 'exec-output', 'id': event.request_id, stream: base64.standard_b64encode(data), other: base64.standard_b64encode(''), } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) else: if event.file == event.process.stdout: event.process.stdout.close() event.process.stdout = None else: event.process.stderr.close() event.process.stderr = None if event.process.stdout == event.process.stderr == None: event.process.wait() self.procsrc.remove(event.process) msg = { 'type': 'exec-response', 'id': event.request_id, 'exit': event.process.returncode, } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) self.send_helper_ready(self.jm) def _feed_stdin(self, event_source, event): distbuild.crash_point() fd = event.file.fileno() try: n = os.write(fd, event.process.stdin_contents) except os.error, e: # If other end closed the read end, stop writing. if e.errno == errno.EPIPE: logging.debug('JsonMachine: reader closed pipe') event.process.stdin_contents = '' else: raise else: logging.debug('JsonMachine: fed %d bytes to stdin', n) event.process.stdin_contents = event.process.stdin_contents[n:] if event.process.stdin_contents == '': logging.debug('JsonMachine: stdin contents finished, closing') event.file.close() event.process.stdin_contents = None def _eofed(self, event_source, event): distbuild.crash_point() logging.info('eof from parent, closing') event_source.close() self.procsrc.close() class DistributedBuildHelper(cliapp.Application): def add_settings(self): self.settings.string( ['parent-address'], 'address (hostname/ip address) for parent', metavar='HOSTNAME', default='localhost') self.settings.integer( ['parent-port'], 'port number for parent', metavar='PORT', default=3434) self.settings.boolean( ['debug-messages'], 'log messages that are received?') self.settings.string_list( ['crash-condition'], 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' '(this is for testing only)', metavar='FILENAME:FUNCNAME:MAXCALLS') def process_args(self, args): distbuild.add_crash_conditions(self.settings['crash-condition']) # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE # instead. signal.signal(signal.SIGPIPE, signal.SIG_IGN) addr = self.settings['parent-address'] port = self.settings['parent-port'] conn = distbuild.create_socket() conn.connect((addr, port)) helper = HelperMachine(conn) helper.debug_messages = self.settings['debug-messages'] loop = distbuild.MainLoop() loop.add_state_machine(helper) loop.run() DistributedBuildHelper().run()