summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2015-04-07 18:17:10 +0000
committerPedro Alvarez <pedro.alvarez@codethink.co.uk>2015-04-22 08:22:39 +0000
commit124e2f1b68a5176dcb3bf37543d4d9ff26a14b95 (patch)
treed00ad83aa71ea34876996823e0eb5523188f84f4
parent8103617c23fed8f07f23431d0b66ca6bcb6cba2d (diff)
downloadmorph-124e2f1b68a5176dcb3bf37543d4d9ff26a14b95.tar.gz
distbuild: Add test suite for distbuild-helper
This is mostly to check that the 'cancel entire subprocess tree' works as expected. Revert that patch and the test fails. There are also some tweaks included in this commit. Change-Id: If297522e6589ebb3a07dac66a39eb243789e53aa
-rwxr-xr-xdistbuild-helper14
-rw-r--r--distbuild/helper_router.py4
-rw-r--r--distbuild/json_router.py2
-rw-r--r--distbuild/mainloop.py65
-rw-r--r--distbuild/sockserv.py4
-rwxr-xr-xtest-distbuild-helper205
6 files changed, 289 insertions, 5 deletions
diff --git a/distbuild-helper b/distbuild-helper
index 1f648dd4..b1300fb0 100755
--- a/distbuild-helper
+++ b/distbuild-helper
@@ -32,6 +32,20 @@ import urlparse
import distbuild
+'''distbuild-helper
+
+This program exists to run other programs, and to make HTTP requests. It
+is used both by the distbuild controller (`morph controller-daemon` process)
+and the distbuild workers (`morph worker-daemon` processes).
+
+It communicates via JSON messages. For workers, it connects on the same port
+that the controller process connects. The controller has a special
+controller-helper-port which it uses for communication with its
+distbuild-helper process.
+
+'''
+
+
class HelperMachine(distbuild.StateMachine):
def __init__(self, conn):
diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py
index 5578f750..22096b81 100644
--- a/distbuild/helper_router.py
+++ b/distbuild/helper_router.py
@@ -40,7 +40,7 @@ class HelperResult(object):
class HelperRouter(distbuild.StateMachine):
- '''Route JSON messages between helpers and other state machines.
+ '''Route JSON messages between controller and its helper.
This state machine relays and schedules access to one distbuild-helper
process. The helper process connects to a socket, which causes an
@@ -51,7 +51,7 @@ class HelperRouter(distbuild.StateMachine):
Other state machines in the same mainloop as HelperRouter can
request work from the helper process by emitting an event:
- * event source: the distbuild.HelperProcess class
+ * event source: the distbuild.HelperRouter class
* event: distbuild.HelperRequest instance
The HelperRequest event gets a message to be serialised as JSON.
diff --git a/distbuild/json_router.py b/distbuild/json_router.py
index d9c32a9c..0f41bfc6 100644
--- a/distbuild/json_router.py
+++ b/distbuild/json_router.py
@@ -22,7 +22,7 @@ import distbuild
class JsonRouter(distbuild.StateMachine):
- '''Route JSON messages between clients and helpers.
+ '''Route JSON messages between controller, worker and worker helper(s).
This state machine receives JSON messages from clients and helpers,
and routes messages between them.
diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py
index e7c0cc3b..d48d60ec 100644
--- a/distbuild/mainloop.py
+++ b/distbuild/mainloop.py
@@ -132,3 +132,68 @@ class MainLoop(object):
event_source, event = self._events.pop(0)
yield event_source, event
+
+
+class TestableMainLoop(MainLoop):
+ '''Special mainloop class with extra hooks for tests to use.
+
+ When writing a test, you often need to wait until a certain event has
+ happened, then examine that event. The run_until_event() and
+ run_until_new_state_machine() functions allow this.
+
+ '''
+ def __init__(self):
+ super(TestableMainLoop, self).__init__()
+
+ self._machines_added_this_cycle = []
+ self._events_sent_this_cycle = []
+
+ def add_state_machine(self, machine):
+ # Overriding the base class to monitor new state machines.
+ super(TestableMainLoop, self).add_state_machine(machine)
+ self._machines_added_this_cycle.append(machine)
+
+ def queue_event(self, event_source, event):
+ # Overriding the base class to monitor new events.
+ super(TestableMainLoop, self).queue_event(event_source, event)
+ self._events_sent_this_cycle.append((event_source, event))
+
+ def run_until_event(self, target_event_source, target_event_type):
+ '''Run the main loop continuously until a given event happens.
+
+ All queued messages will be processed before the loop exits.
+
+ '''
+ logging.debug('Running main loop until a %s event from %s.',
+ target_event_type, target_event_source)
+ while self._machines:
+ self._events_sent_this_cycle = []
+
+ self._run_once()
+
+ for event_source, event in self._events_sent_this_cycle:
+ if target_event_source == event_source:
+ if isinstance(event, target_event_type):
+ logging.debug(
+ 'Received %s from %s, exiting loop.', event,
+ event_source)
+ return event
+
+ def run_until_new_state_machine(self, target_machine_type):
+ '''Run the main loop continuously until a new state machine appears.
+
+ All queued messages will be processed before the loop exits.
+
+ '''
+ logging.debug('Running main loop until a new %s appears.',
+ target_machine_type)
+ while self._machines:
+ self._machines_added_this_cycle = []
+
+ self._run_once()
+
+ for machine in self._machines_added_this_cycle:
+ if type(machine) == target_machine_type:
+ logging.debug(
+ 'Found new machine %s, exiting loop.', machine)
+ return machine
diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py
index c9979328..998cfb11 100644
--- a/distbuild/sockserv.py
+++ b/distbuild/sockserv.py
@@ -35,10 +35,10 @@ class ListenServer(StateMachine):
def setup(self):
src = ListeningSocketEventSource(self._addr, self._port)
+ _, self._port = src.sock.getsockname()
if self._port_file:
- host, port = src.sock.getsockname()
with open(self._port_file, 'w') as f:
- f.write('%s\n' % port)
+ f.write('%s\n' % self._port)
self.mainloop.add_event_source(src)
spec = [
diff --git a/test-distbuild-helper b/test-distbuild-helper
new file mode 100755
index 00000000..baf8ce82
--- /dev/null
+++ b/test-distbuild-helper
@@ -0,0 +1,205 @@
+#!/usr/bin/python
+#
+# test-distbuild-helper.py -- tests for distbuild-helper tool
+#
+# Copyright (C) 2015 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import logging
+import os
+import subprocess
+import sys
+import unittest
+
+import distbuild
+
+
+def assert_process_is_running(process_id):
+ assert os.path.exists('/proc/%s' % process_id), \
+ 'Expected process ID %s to be running, but it is not!' % process_id
+
+
+def assert_process_is_not_running(process_id):
+ assert not os.path.exists('/proc/%s' % process_id), \
+ 'Expected process ID %s to not be running, but it is!' % process_id
+
+
+class DistbuildHelperTestCase(unittest.TestCase):
+ '''Base class for `distbuild-helper` test cases.'''
+
+ def run_distbuild_helper(self, loop):
+ '''Run `distbuild-helper`, and wait for it to connect to this process.
+
+ An unused port is allocated for communication with the
+ `distbuild-helper` process.
+
+ Returns a subprocess.Popen instance, and the port number that was
+ allocated.
+
+ '''
+
+ logging.info('Setting up a listener for connections from the '
+ '`distbuild-helper` process.')
+ listener = distbuild.ListenServer(
+ addr='localhost', port=0, machine=distbuild.JsonRouter)
+ loop.add_state_machine(listener)
+
+ logging.info('Starting the `distbuild-helper` process.')
+ process = subprocess.Popen(
+ ['./distbuild-helper', '--parent-address', 'localhost',
+ '--parent-port', str(listener._port)])
+
+ logging.info('Waiting for connection from helper subprocess.')
+ json_router = loop.run_until_new_state_machine(
+ distbuild.JsonRouter)
+
+ return process, listener._port
+
+ def connect_to_helper(self, loop, port):
+ '''Returns a FakeWorker connected to the distbuild helper.
+
+ I hope this is not needed, and we can use the JsonMachine to talk to
+ the distbuild-helper process directly.'''
+
+ class ListenableJsonMachine(distbuild.JsonMachine):
+ '''Kludgy adapter around JsonMachine class.
+
+ The JsonMachine class is used for two-way communications over a
+ socket using JSON messages. This wrapper exists so we can have a
+ JsonMachine instance created by a ListenServer class. It simply
+ makes the prototype of __init__() match what ListenServer expects.
+
+ '''
+ def __init__(self, cm, conn):
+ super(ListenableJsonMachine, self).__init__(conn)
+
+ worker_connection_machine = distbuild.ConnectionMachine(
+ addr='localhost', port=port, machine=ListenableJsonMachine,
+ extra_args=[])
+ loop.add_state_machine(worker_connection_machine)
+
+ logging.info('Awaiting connection to worker port.')
+ jm = loop.run_until_new_state_machine(ListenableJsonMachine)
+
+ return jm
+
+ def assert_json_message(self, type):
+ '''Assert about the next message we receive from the helper.
+
+ Return the message that was received.
+
+ '''
+ event = self.helper_jm.mainloop.run_until_event(
+ self.helper_jm, distbuild.JsonNewMessage)
+ logging.debug('Received %s', event.msg)
+
+ assert event.msg['type'] == type, \
+ "Expected a JSON message of type %s from the helper, but " \
+ "received: %s" % (type, event.msg)
+
+ return event.msg
+
+ def send_exec_cancel(self, id):
+ '''Send an exec-cancel message to the helper process.'''
+ msg = distbuild.message('exec-cancel', id=id)
+ self.helper_jm.send(msg)
+
+ def send_exec_request(self, id, argv):
+ '''Send an exec-request message to the helper process.'''
+ msg = distbuild.message(
+ 'exec-request', id=id, argv=argv, stdin_contents='',
+ )
+ self.helper_jm.send(msg)
+
+
+class ExecutionTests(DistbuildHelperTestCase):
+ '''Test the execution and cancellation of subprocesses.'''
+
+ def setUp(self):
+ #logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
+
+ loop = distbuild.mainloop.TestableMainLoop()
+ self.helper_process, helper_port = self.run_distbuild_helper(loop)
+ self.helper_jm = self.connect_to_helper(loop, helper_port)
+
+ def tearDown(self):
+ self.helper_process.terminate()
+
+ def test_run_process_success(self):
+ '''Test that exec-request starts a process, and we get a response.'''
+
+ TEST_ARGV = ['sh', '-c', "echo test"]
+
+ self.send_exec_request(id='test1', argv=TEST_ARGV)
+
+ msg = self.assert_json_message(type='exec-output')
+ self.assertEqual(msg['stdout'], 'test\n')
+
+ msg = self.assert_json_message(type='exec-response')
+ self.assertEqual(msg['exit'], 0)
+
+ def test_cancel_process_tree(self):
+ '''Test that exec-cancel kills the entire process tree.
+
+ By default SIGKILL only kills the direct child process, but this may
+ have spawned other processes that might keep running.
+
+ To test that the process tree is killed, this test runs a script which
+ spawns another shell. The 2nd shell will output its process ID, then
+ sleep for 10 seconds, then output another message. If we see the second
+ message then we know that the SIGKILL didn't work correctly -- if the
+ 2nd shell had been killed, it would not have been able to output the
+ second message.
+
+ '''
+
+ # This shell script runs a subprocess that will sleep for 10 seconds,
+ # then output a line of text, unless killed during the sleep.
+
+ TEST_PROGRAM = '''
+ sh -c 'echo "$$ is child process ID";
+ sleep 10;
+ echo "Process was not killed."'
+ '''
+
+ # Start the subprocess and wait for the first line of output.
+
+ self.send_exec_request(id='test1', argv=['sh', '-c', TEST_PROGRAM])
+
+ msg = self.assert_json_message(type='exec-output')
+
+ stdout = msg['stdout']
+ self.assertIn('child process ID', stdout)
+
+ child_process_id = stdout.split()[0]
+ assert_process_is_running(child_process_id)
+
+ # Now cancel the subprocess, while it is in the 'sleep' command, and
+ # wait for the exec-response message. If we receive an exec-output
+ # message here instead then we know that the process wasn't killed
+ # during the 'sleep'.
+
+ self.send_exec_cancel(id='test1')
+
+ msg = self.assert_json_message(type='exec-response')
+
+ assert_process_is_not_running(child_process_id)
+
+ assert msg['exit'] == -9, \
+ 'Process was not killed -- exit code is %i' % msg['exit']
+
+
+if __name__ == '__main__':
+ unittest.main()