#!/usr/bin/python # # distbuild-helper -- helper process for Morph distributed building # # Copyright (C) 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see . import cliapp import errno import fcntl import httplib import logging import os import signal import socket import subprocess import sys import time import urlparse import distbuild '''distbuild-helper This program exists to run other programs, and to make HTTP requests. It is used both by the distbuild controller (`morph controller-daemon` process) and the distbuild workers (`morph worker-daemon` processes). It communicates via JSON messages. For workers, it connects on the same port that the controller process connects. The controller has a special controller-helper-port which it uses for communication with its distbuild-helper process. ''' class HelperMachine(distbuild.StateMachine): def __init__(self, conn): distbuild.StateMachine.__init__(self, 'waiting') self.conn = conn self.debug_messages = False def setup(self): distbuild.crash_point() jm = self.jm = distbuild.JsonMachine(self.conn) self.mainloop.add_state_machine(jm) p = self.procsrc = distbuild.SubprocessEventSource() self.mainloop.add_event_source(p) self.send_helper_ready(jm) spec = [ ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do), ('waiting', jm, distbuild.JsonEof, None, self._eofed), ('waiting', p, distbuild.FileReadable, 'waiting', self._relay_exec_output), ('waiting', p, distbuild.FileWriteable, 'waiting', self._feed_stdin), ] self.add_transitions(spec) def send_helper_ready(self, jm): msg = { 'type': 'helper-ready', } jm.send(msg) logging.debug('HelperMachine: sent: %s', repr(msg)) def do(self, parent, event): distbuild.crash_point() logging.debug('JsonMachine: got: %s', repr(event.msg)) handlers = { 'http-request': self.do_http_request, 'exec-request': self.do_exec_request, 'exec-cancel': self.do_exec_cancel, } handler = handlers.get(event.msg['type']) handler(parent, event.msg) def do_http_request(self, parent, msg): distbuild.crash_point() url = msg['url'] method = msg['method'] headers = msg['headers'] body = msg['body'] assert method in ('HEAD', 'GET', 'POST') logging.debug('JsonMachine: http request: %s %s' % (method, url)) schema, netloc, path, query, fragment = urlparse.urlsplit(url) assert schema == 'http' if query: path += '?' + query try: conn = httplib.HTTPConnection(netloc) if headers: conn.request(method, path, body, headers) else: conn.request(method, path, body) except (socket.error, httplib.HTTPException), e: status = 418 # teapot data = str(e) else: res = conn.getresponse() status = res.status data = res.read() conn.close() response = { 'type': 'http-response', 'id': msg['id'], 'status': status, 'body': data, } parent.send(response) logging.debug('JsonMachine: sent to parent: %s', repr(response)) self.send_helper_ready(parent) def do_exec_request(self, parent, msg): distbuild.crash_point() argv = msg['argv'] stdin_contents = msg.get('stdin_contents', '') logging.debug('JsonMachine: exec request: argv=%s', repr(argv)) logging.debug( 'JsonMachine: exec request: stdin=%s', repr(stdin_contents)) p = subprocess.Popen(argv, preexec_fn=os.setpgrp, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.stdin_contents = stdin_contents self.procsrc.add(msg['id'], p) def do_exec_cancel(self, parent, msg): distbuild.crash_point() self.procsrc.kill_by_id(msg['id']) def _relay_exec_output(self, event_source, event): distbuild.crash_point() buf_size = 16 * 1024 fd = event.file.fileno() data = os.read(fd, buf_size) if data: if event.file == event.process.stdout: stream = 'stdout' other = 'stderr' else: stream = 'stderr' other = 'stdout' msg = { 'type': 'exec-output', 'id': event.request_id, stream: data, other: '', } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) else: if event.file == event.process.stdout: event.process.stdout.close() event.process.stdout = None else: event.process.stderr.close() event.process.stderr = None if event.process.stdout == event.process.stderr == None: event.process.wait() self.procsrc.remove(event.process) msg = { 'type': 'exec-response', 'id': event.request_id, 'exit': event.process.returncode, } logging.debug('JsonMachine: sent to parent: %s', repr(msg)) self.jm.send(msg) self.send_helper_ready(self.jm) def _feed_stdin(self, event_source, event): distbuild.crash_point() fd = event.file.fileno() try: n = os.write(fd, event.process.stdin_contents) except os.error, e: # If other end closed the read end, stop writing. if e.errno == errno.EPIPE: logging.debug('JsonMachine: reader closed pipe') event.process.stdin_contents = '' else: raise else: logging.debug('JsonMachine: fed %d bytes to stdin', n) event.process.stdin_contents = event.process.stdin_contents[n:] if event.process.stdin_contents == '': logging.debug('JsonMachine: stdin contents finished, closing') event.file.close() event.process.stdin_contents = None def _eofed(self, event_source, event): distbuild.crash_point() logging.info('eof from parent, closing') event_source.close() self.procsrc.close() class DistributedBuildHelper(cliapp.Application): def add_settings(self): self.settings.string( ['parent-address'], 'address (hostname/ip address) for parent', metavar='HOSTNAME', default='localhost') self.settings.integer( ['parent-port'], 'port number for parent', metavar='PORT', default=3434) self.settings.boolean( ['debug-messages'], 'log messages that are received?') self.settings.string_list( ['crash-condition'], 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions ' '(this is for testing only)', metavar='FILENAME:FUNCNAME:MAXCALLS') def process_args(self, args): distbuild.add_crash_conditions(self.settings['crash-condition']) # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE # instead. signal.signal(signal.SIGPIPE, signal.SIG_IGN) addr = self.settings['parent-address'] port = self.settings['parent-port'] conn = distbuild.create_socket() conn.connect((addr, port)) helper = HelperMachine(conn) helper.debug_messages = self.settings['debug-messages'] loop = distbuild.MainLoop() loop.add_state_machine(helper) loop.run() DistributedBuildHelper().run()