diff options
-rwxr-xr-x | distbuild-helper | 14 | ||||
-rw-r--r-- | distbuild/helper_router.py | 4 | ||||
-rw-r--r-- | distbuild/json_router.py | 2 | ||||
-rw-r--r-- | distbuild/mainloop.py | 65 | ||||
-rw-r--r-- | distbuild/sockserv.py | 4 | ||||
-rwxr-xr-x | test-distbuild-helper | 205 |
6 files changed, 289 insertions, 5 deletions
diff --git a/distbuild-helper b/distbuild-helper index 1f648dd4..b1300fb0 100755 --- a/distbuild-helper +++ b/distbuild-helper @@ -32,6 +32,20 @@ import urlparse import distbuild +'''distbuild-helper + +This program exists to run other programs, and to make HTTP requests. It +is used both by the distbuild controller (`morph controller-daemon` process) +and the distbuild workers (`morph worker-daemon` processes). + +It communicates via JSON messages. For workers, it connects on the same port +that the controller process connects. The controller has a special +controller-helper-port which it uses for communication with its +distbuild-helper process. + +''' + + class HelperMachine(distbuild.StateMachine): def __init__(self, conn): diff --git a/distbuild/helper_router.py b/distbuild/helper_router.py index 5578f750..22096b81 100644 --- a/distbuild/helper_router.py +++ b/distbuild/helper_router.py @@ -40,7 +40,7 @@ class HelperResult(object): class HelperRouter(distbuild.StateMachine): - '''Route JSON messages between helpers and other state machines. + '''Route JSON messages between controller and its helper. This state machine relays and schedules access to one distbuild-helper process. The helper process connects to a socket, which causes an @@ -51,7 +51,7 @@ class HelperRouter(distbuild.StateMachine): Other state machines in the same mainloop as HelperRouter can request work from the helper process by emitting an event: - * event source: the distbuild.HelperProcess class + * event source: the distbuild.HelperRouter class * event: distbuild.HelperRequest instance The HelperRequest event gets a message to be serialised as JSON. diff --git a/distbuild/json_router.py b/distbuild/json_router.py index d9c32a9c..0f41bfc6 100644 --- a/distbuild/json_router.py +++ b/distbuild/json_router.py @@ -22,7 +22,7 @@ import distbuild class JsonRouter(distbuild.StateMachine): - '''Route JSON messages between clients and helpers. + '''Route JSON messages between controller, worker and worker helper(s). This state machine receives JSON messages from clients and helpers, and routes messages between them. diff --git a/distbuild/mainloop.py b/distbuild/mainloop.py index e7c0cc3b..d48d60ec 100644 --- a/distbuild/mainloop.py +++ b/distbuild/mainloop.py @@ -132,3 +132,68 @@ class MainLoop(object): event_source, event = self._events.pop(0) yield event_source, event + + +class TestableMainLoop(MainLoop): + '''Special mainloop class with extra hooks for tests to use. + + When writing a test, you often need to wait until a certain event has + happened, then examine that event. The run_until_event() and + run_until_new_state_machine() functions allow this. + + ''' + def __init__(self): + super(TestableMainLoop, self).__init__() + + self._machines_added_this_cycle = [] + self._events_sent_this_cycle = [] + + def add_state_machine(self, machine): + # Overriding the base class to monitor new state machines. + super(TestableMainLoop, self).add_state_machine(machine) + self._machines_added_this_cycle.append(machine) + + def queue_event(self, event_source, event): + # Overriding the base class to monitor new events. + super(TestableMainLoop, self).queue_event(event_source, event) + self._events_sent_this_cycle.append((event_source, event)) + + def run_until_event(self, target_event_source, target_event_type): + '''Run the main loop continuously until a given event happens. + + All queued messages will be processed before the loop exits. + + ''' + logging.debug('Running main loop until a %s event from %s.', + target_event_type, target_event_source) + while self._machines: + self._events_sent_this_cycle = [] + + self._run_once() + + for event_source, event in self._events_sent_this_cycle: + if target_event_source == event_source: + if isinstance(event, target_event_type): + logging.debug( + 'Received %s from %s, exiting loop.', event, + event_source) + return event + + def run_until_new_state_machine(self, target_machine_type): + '''Run the main loop continuously until a new state machine appears. + + All queued messages will be processed before the loop exits. + + ''' + logging.debug('Running main loop until a new %s appears.', + target_machine_type) + while self._machines: + self._machines_added_this_cycle = [] + + self._run_once() + + for machine in self._machines_added_this_cycle: + if type(machine) == target_machine_type: + logging.debug( + 'Found new machine %s, exiting loop.', machine) + return machine diff --git a/distbuild/sockserv.py b/distbuild/sockserv.py index c9979328..998cfb11 100644 --- a/distbuild/sockserv.py +++ b/distbuild/sockserv.py @@ -35,10 +35,10 @@ class ListenServer(StateMachine): def setup(self): src = ListeningSocketEventSource(self._addr, self._port) + _, self._port = src.sock.getsockname() if self._port_file: - host, port = src.sock.getsockname() with open(self._port_file, 'w') as f: - f.write('%s\n' % port) + f.write('%s\n' % self._port) self.mainloop.add_event_source(src) spec = [ diff --git a/test-distbuild-helper b/test-distbuild-helper new file mode 100755 index 00000000..baf8ce82 --- /dev/null +++ b/test-distbuild-helper @@ -0,0 +1,205 @@ +#!/usr/bin/python +# +# test-distbuild-helper.py -- tests for distbuild-helper tool +# +# Copyright (C) 2015 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program. If not, see <http://www.gnu.org/licenses/>. + + +import logging +import os +import subprocess +import sys +import unittest + +import distbuild + + +def assert_process_is_running(process_id): + assert os.path.exists('/proc/%s' % process_id), \ + 'Expected process ID %s to be running, but it is not!' % process_id + + +def assert_process_is_not_running(process_id): + assert not os.path.exists('/proc/%s' % process_id), \ + 'Expected process ID %s to not be running, but it is!' % process_id + + +class DistbuildHelperTestCase(unittest.TestCase): + '''Base class for `distbuild-helper` test cases.''' + + def run_distbuild_helper(self, loop): + '''Run `distbuild-helper`, and wait for it to connect to this process. + + An unused port is allocated for communication with the + `distbuild-helper` process. + + Returns a subprocess.Popen instance, and the port number that was + allocated. + + ''' + + logging.info('Setting up a listener for connections from the ' + '`distbuild-helper` process.') + listener = distbuild.ListenServer( + addr='localhost', port=0, machine=distbuild.JsonRouter) + loop.add_state_machine(listener) + + logging.info('Starting the `distbuild-helper` process.') + process = subprocess.Popen( + ['./distbuild-helper', '--parent-address', 'localhost', + '--parent-port', str(listener._port)]) + + logging.info('Waiting for connection from helper subprocess.') + json_router = loop.run_until_new_state_machine( + distbuild.JsonRouter) + + return process, listener._port + + def connect_to_helper(self, loop, port): + '''Returns a FakeWorker connected to the distbuild helper. + + I hope this is not needed, and we can use the JsonMachine to talk to + the distbuild-helper process directly.''' + + class ListenableJsonMachine(distbuild.JsonMachine): + '''Kludgy adapter around JsonMachine class. + + The JsonMachine class is used for two-way communications over a + socket using JSON messages. This wrapper exists so we can have a + JsonMachine instance created by a ListenServer class. It simply + makes the prototype of __init__() match what ListenServer expects. + + ''' + def __init__(self, cm, conn): + super(ListenableJsonMachine, self).__init__(conn) + + worker_connection_machine = distbuild.ConnectionMachine( + addr='localhost', port=port, machine=ListenableJsonMachine, + extra_args=[]) + loop.add_state_machine(worker_connection_machine) + + logging.info('Awaiting connection to worker port.') + jm = loop.run_until_new_state_machine(ListenableJsonMachine) + + return jm + + def assert_json_message(self, type): + '''Assert about the next message we receive from the helper. + + Return the message that was received. + + ''' + event = self.helper_jm.mainloop.run_until_event( + self.helper_jm, distbuild.JsonNewMessage) + logging.debug('Received %s', event.msg) + + assert event.msg['type'] == type, \ + "Expected a JSON message of type %s from the helper, but " \ + "received: %s" % (type, event.msg) + + return event.msg + + def send_exec_cancel(self, id): + '''Send an exec-cancel message to the helper process.''' + msg = distbuild.message('exec-cancel', id=id) + self.helper_jm.send(msg) + + def send_exec_request(self, id, argv): + '''Send an exec-request message to the helper process.''' + msg = distbuild.message( + 'exec-request', id=id, argv=argv, stdin_contents='', + ) + self.helper_jm.send(msg) + + +class ExecutionTests(DistbuildHelperTestCase): + '''Test the execution and cancellation of subprocesses.''' + + def setUp(self): + #logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) + + loop = distbuild.mainloop.TestableMainLoop() + self.helper_process, helper_port = self.run_distbuild_helper(loop) + self.helper_jm = self.connect_to_helper(loop, helper_port) + + def tearDown(self): + self.helper_process.terminate() + + def test_run_process_success(self): + '''Test that exec-request starts a process, and we get a response.''' + + TEST_ARGV = ['sh', '-c', "echo test"] + + self.send_exec_request(id='test1', argv=TEST_ARGV) + + msg = self.assert_json_message(type='exec-output') + self.assertEqual(msg['stdout'], 'test\n') + + msg = self.assert_json_message(type='exec-response') + self.assertEqual(msg['exit'], 0) + + def test_cancel_process_tree(self): + '''Test that exec-cancel kills the entire process tree. + + By default SIGKILL only kills the direct child process, but this may + have spawned other processes that might keep running. + + To test that the process tree is killed, this test runs a script which + spawns another shell. The 2nd shell will output its process ID, then + sleep for 10 seconds, then output another message. If we see the second + message then we know that the SIGKILL didn't work correctly -- if the + 2nd shell had been killed, it would not have been able to output the + second message. + + ''' + + # This shell script runs a subprocess that will sleep for 10 seconds, + # then output a line of text, unless killed during the sleep. + + TEST_PROGRAM = ''' + sh -c 'echo "$$ is child process ID"; + sleep 10; + echo "Process was not killed."' + ''' + + # Start the subprocess and wait for the first line of output. + + self.send_exec_request(id='test1', argv=['sh', '-c', TEST_PROGRAM]) + + msg = self.assert_json_message(type='exec-output') + + stdout = msg['stdout'] + self.assertIn('child process ID', stdout) + + child_process_id = stdout.split()[0] + assert_process_is_running(child_process_id) + + # Now cancel the subprocess, while it is in the 'sleep' command, and + # wait for the exec-response message. If we receive an exec-output + # message here instead then we know that the process wasn't killed + # during the 'sleep'. + + self.send_exec_cancel(id='test1') + + msg = self.assert_json_message(type='exec-response') + + assert_process_is_not_running(child_process_id) + + assert msg['exit'] == -9, \ + 'Process was not killed -- exit code is %i' % msg['exit'] + + +if __name__ == '__main__': + unittest.main() |