summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdistbuild-helper14
-rw-r--r--distbuild/helper_router.py4
-rw-r--r--distbuild/json_router.py2
-rw-r--r--distbuild/mainloop.py65
-rw-r--r--distbuild/sockserv.py4
-rwxr-xr-xtest-distbuild-helper205
6 files changed, 289 insertions, 5 deletions
diff --git a/distbuild-helper b/distbuild-helper
index 1f648dd4..b1300fb0 100755
--- a/distbuild-helper
+++ b/distbuild-helper
@@ -32,6 +32,20 @@ import urlparse
import distbuild
+'''distbuild-helper
+
+This program exists to run other programs, and to make HTTP requests. It
+is used both by the distbuild controller (`morph controller-daemon` process)
+and the distbuild workers (`morph worker-daemon` processes).
+
+It communicates via JSON messages. For workers, it connects on the same port
+that the controller process connects. The controller has a special
+controller-helper-port which it uses for communication with its
+distbuild-helper process.
+
+'''
+
+
class HelperMachine(distbuild.StateMachine):
def __init__(self, conn):
diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py
index 5578f750..22096b81 100644
--- a/distbuild/helper_router.py
+++ b/distbuild/helper_router.py
@@ -40,7 +40,7 @@ class HelperResult(object):
class HelperRouter(distbuild.StateMachine):
- '''Route JSON messages between helpers and other state machines.
+ '''Route JSON messages between controller and its helper.
This state machine relays and schedules access to one distbuild-helper
process. The helper process connects to a socket, which causes an
@@ -51,7 +51,7 @@ class HelperRouter(distbuild.StateMachine):
Other state machines in the same mainloop as HelperRouter can
request work from the helper process by emitting an event:
- * event source: the distbuild.HelperProcess class
+ * event source: the distbuild.HelperRouter class
* event: distbuild.HelperRequest instance
The HelperRequest event gets a message to be serialised as JSON.
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
index d9c32a9c..0f41bfc6 100644
--- a/distbuild/json_router.py
+++ b/distbuild/json_router.py
@@ -22,7 +22,7 @@ import distbuild
class JsonRouter(distbuild.StateMachine):
- '''Route JSON messages between clients and helpers.
+ '''Route JSON messages between controller, worker and worker helper(s).
This state machine receives JSON messages from clients and helpers,
and routes messages between them.
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
index e7c0cc3b..d48d60ec 100644
--- a/distbuild/mainloop.py
+++ b/distbuild/mainloop.py
@@ -132,3 +132,68 @@ class MainLoop(object):
event_source, event = self._events.pop(0)
yield event_source, event
+
+
+class TestableMainLoop(MainLoop):
+ '''Special mainloop class with extra hooks for tests to use.
+
+ When writing a test, you often need to wait until a certain event has
+ happened, then examine that event. The run_until_event() and
+ run_until_new_state_machine() functions allow this.
+
+ '''
+ def __init__(self):
+ super(TestableMainLoop, self).__init__()
+
+ self._machines_added_this_cycle = []
+ self._events_sent_this_cycle = []
+
+ def add_state_machine(self, machine):
+ # Overriding the base class to monitor new state machines.
+ super(TestableMainLoop, self).add_state_machine(machine)
+ self._machines_added_this_cycle.append(machine)
+
+ def queue_event(self, event_source, event):
+ # Overriding the base class to monitor new events.
+ super(TestableMainLoop, self).queue_event(event_source, event)
+ self._events_sent_this_cycle.append((event_source, event))
+
+ def run_until_event(self, target_event_source, target_event_type):
+ '''Run the main loop continuously until a given event happens.
+
+ All queued messages will be processed before the loop exits.
+
+ '''
+ logging.debug('Running main loop until a %s event from %s.',
+ target_event_type, target_event_source)
+ while self._machines:
+ self._events_sent_this_cycle = []
+
+ self._run_once()
+
+ for event_source, event in self._events_sent_this_cycle:
+ if target_event_source == event_source:
+ if isinstance(event, target_event_type):
+ logging.debug(
+ 'Received %s from %s, exiting loop.', event,
+ event_source)
+ return event
+
+ def run_until_new_state_machine(self, target_machine_type):
+ '''Run the main loop continuously until a new state machine appears.
+
+ All queued messages will be processed before the loop exits.
+
+ '''
+ logging.debug('Running main loop until a new %s appears.',
+ target_machine_type)
+ while self._machines:
+ self._machines_added_this_cycle = []
+
+ self._run_once()
+
+ for machine in self._machines_added_this_cycle:
+ if type(machine) == target_machine_type:
+ logging.debug(
+ 'Found new machine %s, exiting loop.', machine)
+ return machine
diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py
index c9979328..998cfb11 100644
--- a/distbuild/sockserv.py
+++ b/distbuild/sockserv.py
@@ -35,10 +35,10 @@ class ListenServer(StateMachine):
def setup(self):
src = ListeningSocketEventSource(self._addr, self._port)
+ _, self._port = src.sock.getsockname()
if self._port_file:
- host, port = src.sock.getsockname()
with open(self._port_file, 'w') as f:
- f.write('%s\n' % port)
+ f.write('%s\n' % self._port)
self.mainloop.add_event_source(src)
spec = [
diff --git a/test-distbuild-helper b/test-distbuild-helper
new file mode 100755
index 00000000..baf8ce82
--- /dev/null
+++ b/test-distbuild-helper
@@ -0,0 +1,205 @@
+#!/usr/bin/python
+#
+# test-distbuild-helper.py -- tests for distbuild-helper tool
+#
+# Copyright (C) 2015 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import logging
+import os
+import subprocess
+import sys
+import unittest
+
+import distbuild
+
+
+def assert_process_is_running(process_id):
+ assert os.path.exists('/proc/%s' % process_id), \
+ 'Expected process ID %s to be running, but it is not!' % process_id
+
+
+def assert_process_is_not_running(process_id):
+ assert not os.path.exists('/proc/%s' % process_id), \
+ 'Expected process ID %s to not be running, but it is!' % process_id
+
+
+class DistbuildHelperTestCase(unittest.TestCase):
+ '''Base class for `distbuild-helper` test cases.'''
+
+ def run_distbuild_helper(self, loop):
+ '''Run `distbuild-helper`, and wait for it to connect to this process.
+
+ An unused port is allocated for communication with the
+ `distbuild-helper` process.
+
+ Returns a subprocess.Popen instance, and the port number that was
+ allocated.
+
+ '''
+
+ logging.info('Setting up a listener for connections from the '
+ '`distbuild-helper` process.')
+ listener = distbuild.ListenServer(
+ addr='localhost', port=0, machine=distbuild.JsonRouter)
+ loop.add_state_machine(listener)
+
+ logging.info('Starting the `distbuild-helper` process.')
+ process = subprocess.Popen(
+ ['./distbuild-helper', '--parent-address', 'localhost',
+ '--parent-port', str(listener._port)])
+
+ logging.info('Waiting for connection from helper subprocess.')
+ json_router = loop.run_until_new_state_machine(
+ distbuild.JsonRouter)
+
+ return process, listener._port
+
+ def connect_to_helper(self, loop, port):
+ '''Returns a FakeWorker connected to the distbuild helper.
+
+ I hope this is not needed, and we can use the JsonMachine to talk to
+ the distbuild-helper process directly.'''
+
+ class ListenableJsonMachine(distbuild.JsonMachine):
+ '''Kludgy adapter around JsonMachine class.
+
+ The JsonMachine class is used for two-way communications over a
+ socket using JSON messages. This wrapper exists so we can have a
+ JsonMachine instance created by a ListenServer class. It simply
+ makes the prototype of __init__() match what ListenServer expects.
+
+ '''
+ def __init__(self, cm, conn):
+ super(ListenableJsonMachine, self).__init__(conn)
+
+ worker_connection_machine = distbuild.ConnectionMachine(
+ addr='localhost', port=port, machine=ListenableJsonMachine,
+ extra_args=[])
+ loop.add_state_machine(worker_connection_machine)
+
+ logging.info('Awaiting connection to worker port.')
+ jm = loop.run_until_new_state_machine(ListenableJsonMachine)
+
+ return jm
+
+ def assert_json_message(self, type):
+ '''Assert about the next message we receive from the helper.
+
+ Return the message that was received.
+
+ '''
+ event = self.helper_jm.mainloop.run_until_event(
+ self.helper_jm, distbuild.JsonNewMessage)
+ logging.debug('Received %s', event.msg)
+
+ assert event.msg['type'] == type, \
+ "Expected a JSON message of type %s from the helper, but " \
+ "received: %s" % (type, event.msg)
+
+ return event.msg
+
+ def send_exec_cancel(self, id):
+ '''Send an exec-cancel message to the helper process.'''
+ msg = distbuild.message('exec-cancel', id=id)
+ self.helper_jm.send(msg)
+
+ def send_exec_request(self, id, argv):
+ '''Send an exec-request message to the helper process.'''
+ msg = distbuild.message(
+ 'exec-request', id=id, argv=argv, stdin_contents='',
+ )
+ self.helper_jm.send(msg)
+
+
+class ExecutionTests(DistbuildHelperTestCase):
+ '''Test the execution and cancellation of subprocesses.'''
+
+ def setUp(self):
+ #logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
+
+ loop = distbuild.mainloop.TestableMainLoop()
+ self.helper_process, helper_port = self.run_distbuild_helper(loop)
+ self.helper_jm = self.connect_to_helper(loop, helper_port)
+
+ def tearDown(self):
+ self.helper_process.terminate()
+
+ def test_run_process_success(self):
+ '''Test that exec-request starts a process, and we get a response.'''
+
+ TEST_ARGV = ['sh', '-c', "echo test"]
+
+ self.send_exec_request(id='test1', argv=TEST_ARGV)
+
+ msg = self.assert_json_message(type='exec-output')
+ self.assertEqual(msg['stdout'], 'test\n')
+
+ msg = self.assert_json_message(type='exec-response')
+ self.assertEqual(msg['exit'], 0)
+
+ def test_cancel_process_tree(self):
+ '''Test that exec-cancel kills the entire process tree.
+
+ By default SIGKILL only kills the direct child process, but this may
+ have spawned other processes that might keep running.
+
+ To test that the process tree is killed, this test runs a script which
+ spawns another shell. The 2nd shell will output its process ID, then
+ sleep for 10 seconds, then output another message. If we see the second
+ message then we know that the SIGKILL didn't work correctly -- if the
+ 2nd shell had been killed, it would not have been able to output the
+ second message.
+
+ '''
+
+ # This shell script runs a subprocess that will sleep for 10 seconds,
+ # then output a line of text, unless killed during the sleep.
+
+ TEST_PROGRAM = '''
+ sh -c 'echo "$$ is child process ID";
+ sleep 10;
+ echo "Process was not killed."'
+ '''
+
+ # Start the subprocess and wait for the first line of output.
+
+ self.send_exec_request(id='test1', argv=['sh', '-c', TEST_PROGRAM])
+
+ msg = self.assert_json_message(type='exec-output')
+
+ stdout = msg['stdout']
+ self.assertIn('child process ID', stdout)
+
+ child_process_id = stdout.split()[0]
+ assert_process_is_running(child_process_id)
+
+ # Now cancel the subprocess, while it is in the 'sleep' command, and
+ # wait for the exec-response message. If we receive an exec-output
+ # message here instead then we know that the process wasn't killed
+ # during the 'sleep'.
+
+ self.send_exec_cancel(id='test1')
+
+ msg = self.assert_json_message(type='exec-response')
+
+ assert_process_is_not_running(child_process_id)
+
+ assert msg['exit'] == -9, \
+ 'Process was not killed -- exit code is %i' % msg['exit']
+
+
+if __name__ == '__main__':
+ unittest.main()