diff options
| author | Eric Urban <hydrogen18@gmail.com> | 2013-10-06 21:23:20 -0400 |
|---|---|---|
| committer | Sergey Shepelev <temotor@gmail.com> | 2015-02-21 18:12:48 +0300 |
| commit | e536cfaea7616b5e76b311bc0ae87858f3409422 (patch) | |
| tree | e78aa223d0397e3dfb903a960d8c3fc455968b2d /tests/backdoor_test.py | |
| parent | 52d42dd741fc8da7f1b1ac525ff6c9f2f5739ee7 (diff) | |
| download | eventlet-tm3.tar.gz | |
backdoor: support UNIX sockets and IPv6tm3
https://bitbucket.org/eventlet/eventlet/issue/157
Diffstat (limited to 'tests/backdoor_test.py')
| -rw-r--r-- | tests/backdoor_test.py | 69 |
1 files changed, 66 insertions, 3 deletions
diff --git a/tests/backdoor_test.py b/tests/backdoor_test.py index 6facffe..e91bbbc 100644 --- a/tests/backdoor_test.py +++ b/tests/backdoor_test.py @@ -1,11 +1,25 @@ +import os + import eventlet from eventlet import backdoor from eventlet.green import socket +import tests + + +SOCKET_PATH = '/tmp/eventlet_backdoor_test.socket' + -from tests import LimitedTestCase, main +def silent_unlink(path): + try: + os.unlink(SOCKET_PATH) + except OSError: + pass -class BackdoorTest(LimitedTestCase): +class BackdoorTest(tests.LimitedTestCase): + def tearDown(self): + silent_unlink(SOCKET_PATH) + def test_server(self): listener = socket.socket() listener.bind(('localhost', 0)) @@ -29,6 +43,55 @@ class BackdoorTest(LimitedTestCase): # wait for the console to discover that it's dead eventlet.sleep(0.1) + def test_server_on_ipv6(self): + listener = socket.socket(socket.AF_INET6) + listener.bind(('::1', 0)) + listener.listen(5) + serv = eventlet.spawn(backdoor.backdoor_server, listener) + client = socket.socket(socket.AF_INET6) + client.connect(listener.getsockname()) + f = client.makefile('rw') + self.assert_('Python' in f.readline()) + f.readline() # build info + f.readline() # help info + self.assert_('InteractiveConsole' in f.readline()) + self.assertEquals('>>> ', f.read(4)) + f.write('print("hi")\n') + f.flush() + self.assertEquals('hi\n', f.readline()) + self.assertEquals('>>> ', f.read(4)) + f.write('exit()\n') + f.close() + client.close() + serv.kill() + # wait for the console to discover that it's dead + eventlet.sleep(0.1) + + def test_server_on_unix_socket(self): + silent_unlink(SOCKET_PATH) + listener = socket.socket(socket.AF_UNIX) + listener.bind(SOCKET_PATH) + listener.listen(5) + serv = eventlet.spawn(backdoor.backdoor_server, listener) + client = socket.socket(socket.AF_UNIX) + client.connect(SOCKET_PATH) + f = client.makefile('rw') + self.assert_('Python' in f.readline()) + f.readline() # build info + f.readline() # help info + self.assert_('InteractiveConsole' in f.readline()) + self.assertEquals('>>> ', f.read(4)) + f.write('print("hi")\n') + f.flush() + self.assertEquals('hi\n', f.readline()) + self.assertEquals('>>> ', f.read(4)) + f.write('exit()\n') + f.close() + client.close() + serv.kill() + # wait for the console to discover that it's dead + eventlet.sleep(0.1) + if __name__ == '__main__': - main() + tests.main() |
