summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Ipsum <richard.ipsum@codethink.co.uk>2014-03-17 13:43:20 +0000
committerRichard Ipsum <richard.ipsum@codethink.co.uk>2014-03-21 17:51:03 +0000
commitea5a6365888118075216b164a1498b3c4ebe88f5 (patch)
tree6a50d4105fc5d1943640375cbece4c338203cd4b
parent799646770f936d6a1f0e11cea4997f6d39c48c2f (diff)
downloadmorph-ea5a6365888118075216b164a1498b3c4ebe88f5.tar.gz
Add distbuild-helper
-rwxr-xr-xdistbuild-helper322
1 files changed, 322 insertions, 0 deletions
diff --git a/distbuild-helper b/distbuild-helper
new file mode 100755
index 00000000..9d70642e
--- /dev/null
+++ b/distbuild-helper
@@ -0,0 +1,322 @@
+#!/usr/bin/python
+#
+# distbuild-helper -- helper process for Morph distributed building
+#
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA..
+
+import cliapp
+import errno
+import fcntl
+import httplib
+import logging
+import os
+import signal
+import socket
+import subprocess
+import sys
+import time
+import urlparse
+
+import distbuild
+
+
+class FileReadable(object):
+
+ def __init__(self, request_id, p, f):
+ self.request_id = request_id
+ self.process = p
+ self.file = f
+
+
+class FileWriteable(object):
+
+ def __init__(self, request_id, p, f):
+ self.request_id = request_id
+ self.process = p
+ self.file = f
+
+
+class SubprocessEventSource(distbuild.EventSource):
+
+ def __init__(self):
+ self.procs = []
+ self.closed = False
+
+ def get_select_params(self):
+ r = []
+ w = []
+ for requst_id, p in self.procs:
+ if p.stdin_contents is not None:
+ w.append(p.stdin)
+ if p.stdout is not None:
+ r.append(p.stdout)
+ if p.stderr is not None:
+ r.append(p.stderr)
+ return r, w, [], None
+
+ def get_events(self, r, w, x):
+ events = []
+
+ for request_id, p in self.procs:
+ if p.stdin in w:
+ events.append(FileWriteable(request_id, p, p.stdin))
+ if p.stdout in r:
+ events.append(FileReadable(request_id, p, p.stdout))
+ if p.stderr in r:
+ events.append(FileReadable(request_id, p, p.stderr))
+
+ return events
+
+ def add(self, request_id, process):
+
+ self.procs.append((request_id, process))
+ distbuild.set_nonblocking(process.stdin)
+ distbuild.set_nonblocking(process.stdout)
+ distbuild.set_nonblocking(process.stderr)
+
+ def remove(self, process):
+ self.procs = [t for t in self.procs if t[1] != process]
+
+ def kill_by_id(self, request_id):
+ logging.debug('SES: Killing all processes for %s', request_id)
+ for id, process in self.procs:
+ if id == request_id:
+ logging.debug('SES: killing %s', repr(process))
+ process.kill()
+
+ def close(self):
+ self.procs = []
+ self.closed = True
+
+ def is_finished(self):
+ return self.closed
+
+
+class HelperMachine(distbuild.StateMachine):
+
+ def __init__(self, conn):
+ distbuild.StateMachine.__init__(self, 'waiting')
+ self.conn = conn
+ self.debug_messages = False
+
+ def setup(self):
+ distbuild.crash_point()
+
+ jm = self.jm = distbuild.JsonMachine(self.conn)
+ self.mainloop.add_state_machine(jm)
+
+ p = self.procsrc = SubprocessEventSource()
+ self.mainloop.add_event_source(p)
+
+ self.send_helper_ready(jm)
+
+ spec = [
+ ('waiting', jm, distbuild.JsonNewMessage, 'waiting', self.do),
+ ('waiting', jm, distbuild.JsonEof, None, self._eofed),
+ ('waiting', p, FileReadable, 'waiting', self._relay_exec_output),
+ ('waiting', p, FileWriteable, 'waiting', self._feed_stdin),
+ ]
+ self.add_transitions(spec)
+
+ def send_helper_ready(self, jm):
+ msg = {
+ 'type': 'helper-ready',
+ }
+ jm.send(msg)
+ logging.debug('HelperMachine: sent: %s', repr(msg))
+
+ def do(self, parent, event):
+ distbuild.crash_point()
+
+ logging.debug('JsonMachine: got: %s', repr(event.msg))
+ handlers = {
+ 'http-request': self.do_http_request,
+ 'exec-request': self.do_exec_request,
+ 'exec-cancel': self.do_exec_cancel,
+ }
+ handler = handlers.get(event.msg['type'])
+ handler(parent, event.msg)
+
+ def do_http_request(self, parent, msg):
+ distbuild.crash_point()
+
+ url = msg['url']
+ method = msg['method']
+ assert method in ('HEAD', 'GET')
+
+ logging.debug('JsonMachine: http request: %s %s' % (method, url))
+
+ schema, netloc, path, query, fragment = urlparse.urlsplit(url)
+ assert schema == 'http'
+ if query:
+ path += '?' + query
+
+ try:
+ conn = httplib.HTTPConnection(netloc)
+ conn.request(method, path)
+ except (socket.error, httplib.HTTPException), e:
+ status = 418 # teapot
+ data = str(e)
+ else:
+ res = conn.getresponse()
+ status = res.status
+ data = res.read()
+ conn.close()
+
+ response = {
+ 'type': 'http-response',
+ 'id': msg['id'],
+ 'status': status,
+ 'body': data,
+ }
+ parent.send(response)
+ logging.debug('JsonMachine: sent to parent: %s', repr(response))
+ self.send_helper_ready(parent)
+
+ def do_exec_request(self, parent, msg):
+ distbuild.crash_point()
+
+ argv = msg['argv']
+ stdin_contents = msg.get('stdin_contents', '')
+ logging.debug('JsonMachine: exec request: argv=%s', repr(argv))
+ logging.debug(
+ 'JsonMachine: exec request: stdin=%s', repr(stdin_contents))
+
+ p = subprocess.Popen(argv,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ p.stdin_contents = stdin_contents
+
+ self.procsrc.add(msg['id'], p)
+
+ def do_exec_cancel(self, parent, msg):
+ distbuild.crash_point()
+
+ self.procsrc.kill_by_id(msg['id'])
+
+ def _relay_exec_output(self, event_source, event):
+ distbuild.crash_point()
+
+ buf_size = 64
+ fd = event.file.fileno()
+ data = os.read(fd, buf_size)
+ if data:
+ if event.file == event.process.stdout:
+ stream = 'stdout'
+ other = 'stderr'
+ else:
+ stream = 'stderr'
+ other = 'stdout'
+ msg = {
+ 'type': 'exec-output',
+ 'id': event.request_id,
+ stream: data,
+ other: '',
+ }
+ logging.debug('JsonMachine: sent to parent: %s', repr(msg))
+ self.jm.send(msg)
+ else:
+ if event.file == event.process.stdout:
+ event.process.stdout.close()
+ event.process.stdout = None
+ else:
+ event.process.stderr.close()
+ event.process.stderr = None
+
+ if event.process.stdout == event.process.stderr == None:
+ event.process.wait()
+ self.procsrc.remove(event.process)
+ msg = {
+ 'type': 'exec-response',
+ 'id': event.request_id,
+ 'exit': event.process.returncode,
+ }
+ logging.debug('JsonMachine: sent to parent: %s', repr(msg))
+ self.jm.send(msg)
+ self.send_helper_ready(self.jm)
+
+ def _feed_stdin(self, event_source, event):
+ distbuild.crash_point()
+
+ fd = event.file.fileno()
+ try:
+ n = os.write(fd, event.process.stdin_contents)
+ except os.error, e:
+ # If other end closed the read end, stop writing.
+ if e.errno == errno.EPIPE:
+ logging.debug('JsonMachine: reader closed pipe')
+ event.process.stdin_contents = ''
+ else:
+ raise
+ else:
+ logging.debug('JsonMachine: fed %d bytes to stdin', n)
+ event.process.stdin_contents = event.process.stdin_contents[n:]
+ if event.process.stdin_contents == '':
+ logging.debug('JsonMachine: stdin contents finished, closing')
+ event.file.close()
+ event.process.stdin_contents = None
+
+ def _eofed(self, event_source, event):
+ distbuild.crash_point()
+ logging.info('eof from parent, closing')
+ event_source.close()
+ self.procsrc.close()
+
+
+class DistributedBuildHelper(cliapp.Application):
+
+ def add_settings(self):
+ self.settings.string(
+ ['parent-address'],
+ 'address (hostname/ip address) for parent',
+ metavar='HOSTNAME',
+ default='localhost')
+ self.settings.integer(
+ ['parent-port'],
+ 'port number for parent',
+ metavar='PORT',
+ default=3434)
+ self.settings.boolean(
+ ['debug-messages'],
+ 'log messages that are received?')
+ self.settings.string_list(
+ ['crash-condition'],
+ 'add FILENAME:FUNCNAME:MAXCALLS to list of crash conditions '
+ '(this is for testing only)',
+ metavar='FILENAME:FUNCNAME:MAXCALLS')
+
+ def process_args(self, args):
+ distbuild.add_crash_conditions(self.settings['crash-condition'])
+
+ # We don't want SIGPIPE, ever. It just kills us. We handle EPIPE
+ # instead.
+ signal.signal(signal.SIGPIPE, signal.SIG_IGN)
+
+ addr = self.settings['parent-address']
+ port = self.settings['parent-port']
+ conn = socket.socket()
+ conn.connect((addr, port))
+ helper = HelperMachine(conn)
+ helper.debug_messages = self.settings['debug-messages']
+ loop = distbuild.MainLoop()
+ loop.add_state_machine(helper)
+ loop.run()
+
+
+DistributedBuildHelper().run()
+