diff options
Diffstat (limited to 'Lib/test/test_ftplib.py')
-rw-r--r-- | Lib/test/test_ftplib.py | 303 |
1 files changed, 285 insertions, 18 deletions
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py index 1c2ceeb872..91995825ab 100644 --- a/Lib/test/test_ftplib.py +++ b/Lib/test/test_ftplib.py @@ -1,17 +1,24 @@ """Test script for ftplib module.""" -# Modified by Giampaolo Rodola' to test FTP class and IPv6 environment +# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS +# environment import ftplib -import threading import asyncore import asynchat import socket import StringIO +import errno +import os +try: + import ssl +except ImportError: + ssl = None from unittest import TestCase from test import test_support from test.test_support import HOST +threading = test_support.import_module('threading') # the dummy data returned by server over the data channel when @@ -22,6 +29,7 @@ NLST_DATA = 'foo\r\nbar\r\n' class DummyDTPHandler(asynchat.async_chat): + dtp_conn_closed = False def __init__(self, conn, baseclass): asynchat.async_chat.__init__(self, conn) @@ -32,12 +40,22 @@ class DummyDTPHandler(asynchat.async_chat): self.baseclass.last_received_data += self.recv(1024) def handle_close(self): - self.baseclass.push('226 transfer complete') - self.close() + # XXX: this method can be called many times in a row for a single + # connection, including in clear-text (non-TLS) mode. + # (behaviour witnessed with test_data_connection) + if not self.dtp_conn_closed: + self.baseclass.push('226 transfer complete') + self.close() + self.dtp_conn_closed = True + + def handle_error(self): + raise class DummyFTPHandler(asynchat.async_chat): + dtp_handler = DummyDTPHandler + def __init__(self, conn): asynchat.async_chat.__init__(self, conn) self.set_terminator("\r\n") @@ -46,6 +64,7 @@ class DummyFTPHandler(asynchat.async_chat): self.last_received_cmd = None self.last_received_data = '' self.next_response = '' + self.rest = None self.push('220 welcome') def collect_incoming_data(self, data): @@ -80,38 +99,38 @@ class DummyFTPHandler(asynchat.async_chat): addr = map(int, arg.split(',')) ip = '%d.%d.%d.%d' %tuple(addr[:4]) port = (addr[4] * 256) + addr[5] - s = socket.create_connection((ip, port), timeout=2) - self.dtp = DummyDTPHandler(s, baseclass=self) + s = socket.create_connection((ip, port), timeout=10) + self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') def cmd_pasv(self, arg): sock = socket.socket() sock.bind((self.socket.getsockname()[0], 0)) sock.listen(5) - sock.settimeout(2) + sock.settimeout(10) ip, port = sock.getsockname()[:2] ip = ip.replace('.', ',') p1, p2 = divmod(port, 256) self.push('227 entering passive mode (%s,%d,%d)' %(ip, p1, p2)) conn, addr = sock.accept() - self.dtp = DummyDTPHandler(conn, baseclass=self) + self.dtp = self.dtp_handler(conn, baseclass=self) def cmd_eprt(self, arg): af, ip, port = arg.split(arg[0])[1:-1] port = int(port) - s = socket.create_connection((ip, port), timeout=2) - self.dtp = DummyDTPHandler(s, baseclass=self) + s = socket.create_connection((ip, port), timeout=10) + self.dtp = self.dtp_handler(s, baseclass=self) self.push('200 active data connection established') def cmd_epsv(self, arg): sock = socket.socket(socket.AF_INET6) sock.bind((self.socket.getsockname()[0], 0)) sock.listen(5) - sock.settimeout(2) + sock.settimeout(10) port = sock.getsockname()[1] self.push('229 entering extended passive mode (|||%d|)' %port) conn, addr = sock.accept() - self.dtp = DummyDTPHandler(conn, baseclass=self) + self.dtp = self.dtp_handler(conn, baseclass=self) def cmd_echo(self, arg): # sends back the received string (used by the test suite) @@ -160,10 +179,19 @@ class DummyFTPHandler(asynchat.async_chat): def cmd_stor(self, arg): self.push('125 stor ok') + def cmd_rest(self, arg): + self.rest = arg + self.push('350 rest ok') + def cmd_retr(self, arg): self.push('125 retr ok') - self.dtp.push(RETR_DATA) + if self.rest is not None: + offset = int(self.rest) + else: + offset = 0 + self.dtp.push(RETR_DATA[offset:]) self.dtp.close_when_done() + self.rest = None def cmd_list(self, arg): self.push('125 list ok') @@ -226,12 +254,156 @@ class DummyFTPServer(asyncore.dispatcher, threading.Thread): raise +if ssl is not None: + + CERTFILE = os.path.join(os.path.dirname(__file__), "keycert.pem") + + class SSLConnection(object, asyncore.dispatcher): + """An asyncore.dispatcher subclass supporting TLS/SSL.""" + + _ssl_accepting = False + _ssl_closing = False + + def secure_connection(self): + self.socket = ssl.wrap_socket(self.socket, suppress_ragged_eofs=False, + certfile=CERTFILE, server_side=True, + do_handshake_on_connect=False, + ssl_version=ssl.PROTOCOL_SSLv23) + self._ssl_accepting = True + + def _do_ssl_handshake(self): + try: + self.socket.do_handshake() + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return + elif err.args[0] == ssl.SSL_ERROR_EOF: + return self.handle_close() + raise + except socket.error, err: + if err.args[0] == errno.ECONNABORTED: + return self.handle_close() + else: + self._ssl_accepting = False + + def _do_ssl_shutdown(self): + self._ssl_closing = True + try: + self.socket = self.socket.unwrap() + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return + except socket.error, err: + # Any "socket error" corresponds to a SSL_ERROR_SYSCALL return + # from OpenSSL's SSL_shutdown(), corresponding to a + # closed socket condition. See also: + # http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html + pass + self._ssl_closing = False + super(SSLConnection, self).close() + + def handle_read_event(self): + if self._ssl_accepting: + self._do_ssl_handshake() + elif self._ssl_closing: + self._do_ssl_shutdown() + else: + super(SSLConnection, self).handle_read_event() + + def handle_write_event(self): + if self._ssl_accepting: + self._do_ssl_handshake() + elif self._ssl_closing: + self._do_ssl_shutdown() + else: + super(SSLConnection, self).handle_write_event() + + def send(self, data): + try: + return super(SSLConnection, self).send(data) + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN, + ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return 0 + raise + + def recv(self, buffer_size): + try: + return super(SSLConnection, self).recv(buffer_size) + except ssl.SSLError, err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return '' + if err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): + self.handle_close() + return '' + raise + + def handle_error(self): + raise + + def close(self): + if (isinstance(self.socket, ssl.SSLSocket) and + self.socket._sslobj is not None): + self._do_ssl_shutdown() + + + class DummyTLS_DTPHandler(SSLConnection, DummyDTPHandler): + """A DummyDTPHandler subclass supporting TLS/SSL.""" + + def __init__(self, conn, baseclass): + DummyDTPHandler.__init__(self, conn, baseclass) + if self.baseclass.secure_data_channel: + self.secure_connection() + + + class DummyTLS_FTPHandler(SSLConnection, DummyFTPHandler): + """A DummyFTPHandler subclass supporting TLS/SSL.""" + + dtp_handler = DummyTLS_DTPHandler + + def __init__(self, conn): + DummyFTPHandler.__init__(self, conn) + self.secure_data_channel = False + + def cmd_auth(self, line): + """Set up secure control channel.""" + self.push('234 AUTH TLS successful') + self.secure_connection() + + def cmd_pbsz(self, line): + """Negotiate size of buffer for secure data transfer. + For TLS/SSL the only valid value for the parameter is '0'. + Any other value is accepted but ignored. + """ + self.push('200 PBSZ=0 successful.') + + def cmd_prot(self, line): + """Setup un/secure data channel.""" + arg = line.upper() + if arg == 'C': + self.push('200 Protection set to Clear') + self.secure_data_channel = False + elif arg == 'P': + self.push('200 Protection set to Private') + self.secure_data_channel = True + else: + self.push("502 Unrecognized PROT type (use C or P).") + + + class DummyTLS_FTPServer(DummyFTPServer): + handler = DummyTLS_FTPHandler + + class TestFTPClass(TestCase): def setUp(self): self.server = DummyFTPServer((HOST, 0)) self.server.start() - self.client = ftplib.FTP(timeout=2) + self.client = ftplib.FTP(timeout=10) self.client.connect(self.server.host, self.server.port) def tearDown(self): @@ -316,6 +488,15 @@ class TestFTPClass(TestCase): self.client.retrbinary('retr', received.append) self.assertEqual(''.join(received), RETR_DATA) + def test_retrbinary_rest(self): + for rest in (0, 10, 20): + received = [] + self.client.retrbinary('retr', received.append, rest=rest) + self.assertEqual(''.join(received), RETR_DATA[rest:], + msg='rest test case %d %d %d' % (rest, + len(''.join(received)), + len(RETR_DATA[rest:]))) + def test_retrlines(self): received = [] self.client.retrlines('retr', received.append) @@ -331,6 +512,13 @@ class TestFTPClass(TestCase): self.client.storbinary('stor', f, callback=lambda x: flag.append(None)) self.assertTrue(flag) + def test_storbinary_rest(self): + f = StringIO.StringIO(RETR_DATA) + for r in (30, '30'): + f.seek(0) + self.client.storbinary('stor', f, rest=r) + self.assertEqual(self.server.handler.rest, str(r)) + def test_storlines(self): f = StringIO.StringIO(RETR_DATA.replace('\r\n', '\n')) self.client.storlines('stor', f) @@ -357,7 +545,7 @@ class TestFTPClass(TestCase): def test_makepasv(self): host, port = self.client.makepasv() - conn = socket.create_connection((host, port), 2) + conn = socket.create_connection((host, port), 10) conn.close() # IPv4 is in use, just make sure send_epsv has not been used self.assertEqual(self.server.handler.last_received_cmd, 'pasv') @@ -384,7 +572,7 @@ class TestIPv6Environment(TestCase): def test_makepasv(self): host, port = self.client.makepasv() - conn = socket.create_connection((host, port), 2) + conn = socket.create_connection((host, port), 10) conn.close() self.assertEqual(self.server.handler.last_received_cmd, 'epsv') @@ -399,6 +587,81 @@ class TestIPv6Environment(TestCase): retr() +class TestTLS_FTPClassMixin(TestFTPClass): + """Repeat TestFTPClass tests starting the TLS layer for both control + and data connections first. + """ + + def setUp(self): + self.server = DummyTLS_FTPServer((HOST, 0)) + self.server.start() + self.client = ftplib.FTP_TLS(timeout=10) + self.client.connect(self.server.host, self.server.port) + # enable TLS + self.client.auth() + self.client.prot_p() + + +class TestTLS_FTPClass(TestCase): + """Specific TLS_FTP class tests.""" + + def setUp(self): + self.server = DummyTLS_FTPServer((HOST, 0)) + self.server.start() + self.client = ftplib.FTP_TLS(timeout=10) + self.client.connect(self.server.host, self.server.port) + + def tearDown(self): + self.client.close() + self.server.stop() + + def test_control_connection(self): + self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) + self.client.auth() + self.assertIsInstance(self.client.sock, ssl.SSLSocket) + + def test_data_connection(self): + # clear text + sock = self.client.transfercmd('list') + self.assertNotIsInstance(sock, ssl.SSLSocket) + sock.close() + self.assertEqual(self.client.voidresp(), "226 transfer complete") + + # secured, after PROT P + self.client.prot_p() + sock = self.client.transfercmd('list') + self.assertIsInstance(sock, ssl.SSLSocket) + sock.close() + self.assertEqual(self.client.voidresp(), "226 transfer complete") + + # PROT C is issued, the connection must be in cleartext again + self.client.prot_c() + sock = self.client.transfercmd('list') + self.assertNotIsInstance(sock, ssl.SSLSocket) + sock.close() + self.assertEqual(self.client.voidresp(), "226 transfer complete") + + def test_login(self): + # login() is supposed to implicitly secure the control connection + self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) + self.client.login() + self.assertIsInstance(self.client.sock, ssl.SSLSocket) + # make sure that AUTH TLS doesn't get issued again + self.client.login() + + def test_auth_issued_twice(self): + self.client.auth() + self.assertRaises(ValueError, self.client.auth) + + def test_auth_ssl(self): + try: + self.client.ssl_version = ssl.PROTOCOL_SSLv3 + self.client.auth() + self.assertRaises(ValueError, self.client.auth) + finally: + self.client.ssl_version = ssl.PROTOCOL_TLSv1 + + class TestTimeouts(TestCase): def setUp(self): @@ -439,7 +702,7 @@ class TestTimeouts(TestCase): def testTimeoutDefault(self): # default -- use global socket timeout - self.assert_(socket.getdefaulttimeout() is None) + self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: ftp = ftplib.FTP("localhost") @@ -451,7 +714,7 @@ class TestTimeouts(TestCase): def testTimeoutNone(self): # no timeout -- do not use global socket timeout - self.assert_(socket.getdefaulttimeout() is None) + self.assertTrue(socket.getdefaulttimeout() is None) socket.setdefaulttimeout(30) try: ftp = ftplib.FTP("localhost", timeout=None) @@ -500,6 +763,10 @@ def test_main(): pass else: tests.append(TestIPv6Environment) + + if ssl is not None: + tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) + thread_info = test_support.threading_setup() try: test_support.run_unittest(*tests) |