diff options
Diffstat (limited to 'Lib/test/test_httpservers.py')
-rw-r--r-- | Lib/test/test_httpservers.py | 359 |
1 files changed, 234 insertions, 125 deletions
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 28256975a0..e83c048ed9 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -16,12 +16,11 @@ import shutil import urllib.parse import http.client import tempfile -import threading +from io import BytesIO import unittest - -from io import BytesIO from test import support +threading = support.import_module('threading') class NoLogRequestHandler: def log_message(self, *args): @@ -32,21 +31,6 @@ class NoLogRequestHandler: return '' -class SocketlessRequestHandler(SimpleHTTPRequestHandler): - def __init__(self): - self.get_called = False - self.protocol_version = "HTTP/1.1" - - def do_GET(self): - self.get_called = True - self.send_response(200) - self.send_header('Content-Type', 'text/html') - self.end_headers() - self.wfile.write(b'<html><body>Data</body></html>\r\n') - - def log_message(self, format, *args): - pass - class TestServerThread(threading.Thread): def __init__(self, test_object, request_handler): threading.Thread.__init__(self) @@ -54,8 +38,8 @@ class TestServerThread(threading.Thread): self.test_object = test_object def run(self): - self.server = HTTPServer(('', 0), self.request_handler) - self.test_object.PORT = self.server.socket.getsockname()[1] + self.server = HTTPServer(('localhost', 0), self.request_handler) + self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() self.test_object.server_started.set() self.test_object = None try: @@ -69,6 +53,8 @@ class TestServerThread(threading.Thread): class BaseTestCase(unittest.TestCase): def setUp(self): + self._threads = support.threading_setup() + os.environ = support.EnvironmentVarGuard() self.server_started = threading.Event() self.thread = TestServerThread(self, self.request_handler) self.thread.start() @@ -76,81 +62,14 @@ class BaseTestCase(unittest.TestCase): def tearDown(self): self.thread.stop() + os.environ.__exit__() + support.threading_cleanup(*self._threads) def request(self, uri, method='GET', body=None, headers={}): - self.connection = http.client.HTTPConnection('localhost', self.PORT) + self.connection = http.client.HTTPConnection(self.HOST, self.PORT) self.connection.request(method, uri, body, headers) return self.connection.getresponse() -class BaseHTTPRequestHandlerTestCase(unittest.TestCase): - """Test the functionality of the BaseHTTPServer.""" - - HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') - - def setUp (self): - self.handler = SocketlessRequestHandler() - - def send_typical_request(self, message): - input = BytesIO(message) - output = BytesIO() - self.handler.rfile = input - self.handler.wfile = output - self.handler.handle_one_request() - output.seek(0) - return output.readlines() - - def verify_get_called(self): - self.assertTrue(self.handler.get_called) - - def verify_expected_headers(self, headers): - for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': - self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) - - def verify_http_server_response(self, response): - match = self.HTTPResponseMatch.search(response) - self.assertTrue(match is not None) - - def test_http_1_1(self): - result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') - self.verify_http_server_response(result[0]) - self.verify_expected_headers(result[1:-1]) - self.verify_get_called() - self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') - - def test_http_1_0(self): - result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') - self.verify_http_server_response(result[0]) - self.verify_expected_headers(result[1:-1]) - self.verify_get_called() - self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') - - def test_http_0_9(self): - result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') - self.assertEqual(len(result), 1) - self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') - self.verify_get_called() - - def test_with_continue_1_0(self): - result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') - self.verify_http_server_response(result[0]) - self.verify_expected_headers(result[1:-1]) - self.verify_get_called() - self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') - - def test_request_length(self): - # Issue #10714: huge request lines are discarded, to avoid Denial - # of Service attacks. - result = self.send_typical_request(b'GET ' + b'x' * 65537) - self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') - self.assertFalse(self.handler.get_called) - - def test_header_length(self): - # Issue #6791: same for headers - result = self.send_typical_request( - b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') - self.assertEqual(result[0], b'HTTP/1.1 400 Line too long\r\n') - self.assertFalse(self.handler.get_called) - class BaseHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): @@ -178,9 +97,17 @@ class BaseHTTPServerTestCase(BaseTestCase): self.send_header('Connection', 'close') self.end_headers() + def do_LATINONEHEADER(self): + self.send_response(999) + self.send_header('X-Special', 'Dängerous Mind') + self.send_header('Connection', 'close') + self.end_headers() + body = self.headers['x-special-incoming'].encode('utf-8') + self.wfile.write(body) + def setUp(self): BaseTestCase.setUp(self) - self.con = http.client.HTTPConnection('localhost', self.PORT) + self.con = http.client.HTTPConnection(self.HOST, self.PORT) self.con.connect() def test_command(self): @@ -263,6 +190,7 @@ class BaseHTTPServerTestCase(BaseTestCase): res = self.con.getresponse() self.assertEqual(res.getheader('Connection'), 'keep-alive') self.con.request('TEST', '/') + self.addCleanup(self.con.close) def test_internal_key_error(self): self.con.request('KEYERROR', '/') @@ -274,6 +202,14 @@ class BaseHTTPServerTestCase(BaseTestCase): res = self.con.getresponse() self.assertEqual(res.status, 999) + def test_latin1_header(self): + self.con.request('LATINONEHEADER', '/', headers={ + 'X-Special-Incoming': 'Ärger mit Unicode' + }) + res = self.con.getresponse() + self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') + self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) + class SimpleHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): @@ -287,9 +223,8 @@ class SimpleHTTPServerTestCase(BaseTestCase): self.data = b'We are the knights who say Ni!' self.tempdir = tempfile.mkdtemp(dir=basetempdir) self.tempdir_name = os.path.basename(self.tempdir) - temp = open(os.path.join(self.tempdir, 'test'), 'wb') - temp.write(self.data) - temp.close() + with open(os.path.join(self.tempdir, 'test'), 'wb') as temp: + temp.write(self.data) def tearDown(self): try: @@ -305,7 +240,7 @@ class SimpleHTTPServerTestCase(BaseTestCase): body = response.read() self.assertTrue(response) self.assertEqual(response.status, status) - self.assertTrue(response.reason != None) + self.assertIsNotNone(response.reason) if data: self.assertEqual(data, body) @@ -321,15 +256,16 @@ class SimpleHTTPServerTestCase(BaseTestCase): self.check_status_and_reason(response, 404) response = self.request('/' + 'ThisDoesNotExist' + '/') self.check_status_and_reason(response, 404) - f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') - response = self.request('/' + self.tempdir_name + '/') - self.check_status_and_reason(response, 200) - if os.name == 'posix': - # chmod won't work as expected on Windows platforms - os.chmod(self.tempdir, 0) - response = self.request(self.tempdir_name + '/') - self.check_status_and_reason(response, 404) - os.chmod(self.tempdir, 0o755) + with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as f: + response = self.request('/' + self.tempdir_name + '/') + self.check_status_and_reason(response, 200) + # chmod() doesn't work as expected on Windows, and filesystem + # permissions are ignored by root on Unix. + if os.name == 'posix' and os.geteuid() != 0: + os.chmod(self.tempdir, 0) + response = self.request(self.tempdir_name + '/') + self.check_status_and_reason(response, 404) + os.chmod(self.tempdir, 0o755) def test_head(self): response = self.request( @@ -366,39 +302,54 @@ print("Content-type: text/html") print() form = cgi.FieldStorage() -print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\ - form.getfirst("bacon"))) +print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), + form.getfirst("bacon"))) """ + +@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, + "This test can't be run reliably as root (issue #13308).") class CGIHTTPServerTestCase(BaseTestCase): class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): pass def setUp(self): BaseTestCase.setUp(self) + self.cwd = os.getcwd() self.parent_dir = tempfile.mkdtemp() self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') os.mkdir(self.cgi_dir) + self.file1_path = None + self.file2_path = None # The shebang line should be pure ASCII: use symlink if possible. # See issue #7668. - if hasattr(os, 'symlink'): + if support.can_symlink(): self.pythonexe = os.path.join(self.parent_dir, 'python') os.symlink(sys.executable, self.pythonexe) else: self.pythonexe = sys.executable + try: + # The python executable path is written as the first line of the + # CGI Python script. The encoding cookie cannot be used, and so the + # path should be encodable to the default script encoding (utf-8) + self.pythonexe.encode('utf-8') + except UnicodeEncodeError: + self.tearDown() + raise self.skipTest( + "Python executable path is not encodable to utf-8") + self.file1_path = os.path.join(self.cgi_dir, 'file1.py') - with open(self.file1_path, 'w') as file1: + with open(self.file1_path, 'w', encoding='utf-8') as file1: file1.write(cgi_file1 % self.pythonexe) os.chmod(self.file1_path, 0o777) self.file2_path = os.path.join(self.cgi_dir, 'file2.py') - with open(self.file2_path, 'w') as file2: + with open(self.file2_path, 'w', encoding='utf-8') as file2: file2.write(cgi_file2 % self.pythonexe) os.chmod(self.file2_path, 0o777) - self.cwd = os.getcwd() os.chdir(self.parent_dir) def tearDown(self): @@ -406,8 +357,10 @@ class CGIHTTPServerTestCase(BaseTestCase): os.chdir(self.cwd) if self.pythonexe != sys.executable: os.remove(self.pythonexe) - os.remove(self.file1_path) - os.remove(self.file2_path) + if self.file1_path: + os.remove(self.file1_path) + if self.file2_path: + os.remove(self.file2_path) os.rmdir(self.cgi_dir) os.rmdir(self.parent_dir) finally: @@ -449,13 +402,13 @@ class CGIHTTPServerTestCase(BaseTestCase): else: actual = server._url_collapse_path_split(path) self.assertEqual(expected, actual, - msg='path = %r\nGot: %r\nWanted: %r' % ( - path, actual, expected)) + msg='path = %r\nGot: %r\nWanted: %r' % + (path, actual, expected)) def test_headers_and_content(self): res = self.request('/cgi-bin/file1.py') - self.assertEqual((b'Hello World\n', 'text/html', 200), \ - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual((b'Hello World\n', 'text/html', 200), + (res.read(), res.getheader('Content-type'), res.status)) def test_post(self): params = urllib.parse.urlencode( @@ -474,8 +427,8 @@ class CGIHTTPServerTestCase(BaseTestCase): headers = {b'Authorization' : b'Basic ' + base64.b64encode(b'username:pass')} res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) - self.assertEqual((b'Hello World\n', 'text/html', 200), \ - (res.read(), res.getheader('Content-type'), res.status)) + self.assertEqual((b'Hello World\n', 'text/html', 200), + (res.read(), res.getheader('Content-type'), res.status)) def test_no_leading_slash(self): # http://bugs.python.org/issue2254 @@ -492,6 +445,161 @@ class CGIHTTPServerTestCase(BaseTestCase): self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) +class SocketlessRequestHandler(SimpleHTTPRequestHandler): + def __init__(self): + self.get_called = False + self.protocol_version = "HTTP/1.1" + + def do_GET(self): + self.get_called = True + self.send_response(200) + self.send_header('Content-Type', 'text/html') + self.end_headers() + self.wfile.write(b'<html><body>Data</body></html>\r\n') + + def log_message(self, format, *args): + pass + +class RejectingSocketlessRequestHandler(SocketlessRequestHandler): + def handle_expect_100(self): + self.send_error(417) + return False + +class BaseHTTPRequestHandlerTestCase(unittest.TestCase): + """Test the functionality of the BaseHTTPServer. + + Test the support for the Expect 100-continue header. + """ + + HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') + + def setUp (self): + self.handler = SocketlessRequestHandler() + + def send_typical_request(self, message): + input = BytesIO(message) + output = BytesIO() + self.handler.rfile = input + self.handler.wfile = output + self.handler.handle_one_request() + output.seek(0) + return output.readlines() + + def verify_get_called(self): + self.assertTrue(self.handler.get_called) + + def verify_expected_headers(self, headers): + for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': + self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) + + def verify_http_server_response(self, response): + match = self.HTTPResponseMatch.search(response) + self.assertTrue(match is not None) + + def test_http_1_1(self): + result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') + + def test_http_1_0(self): + result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') + + def test_http_0_9(self): + result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') + self.assertEqual(len(result), 1) + self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') + self.verify_get_called() + + def test_with_continue_1_0(self): + result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') + self.verify_http_server_response(result[0]) + self.verify_expected_headers(result[1:-1]) + self.verify_get_called() + self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') + + def test_with_continue_1_1(self): + result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') + self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') + self.assertEqual(result[1], b'HTTP/1.1 200 OK\r\n') + self.verify_expected_headers(result[2:-1]) + self.verify_get_called() + self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') + + def test_header_buffering(self): + + def _readAndReseek(f): + pos = f.tell() + f.seek(0) + data = f.read() + f.seek(pos) + return data + + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = BytesIO() + self.handler.rfile = input + self.handler.wfile = output + self.handler.request_version = 'HTTP/1.1' + + self.handler.send_header('Foo', 'foo') + self.handler.send_header('bar', 'bar') + self.assertEqual(_readAndReseek(output), b'') + self.handler.end_headers() + self.assertEqual(_readAndReseek(output), + b'Foo: foo\r\nbar: bar\r\n\r\n') + + def test_header_unbuffered_when_continue(self): + + def _readAndReseek(f): + pos = f.tell() + f.seek(0) + data = f.read() + f.seek(pos) + return data + + input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') + output = BytesIO() + self.handler.rfile = input + self.handler.wfile = output + self.handler.request_version = 'HTTP/1.1' + + self.handler.handle_one_request() + self.assertNotEqual(_readAndReseek(output), b'') + result = _readAndReseek(output).split(b'\r\n') + self.assertEqual(result[0], b'HTTP/1.1 100 Continue') + self.assertEqual(result[1], b'HTTP/1.1 200 OK') + + def test_with_continue_rejected(self): + usual_handler = self.handler # Save to avoid breaking any subsequent tests. + self.handler = RejectingSocketlessRequestHandler() + result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') + self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') + self.verify_expected_headers(result[1:-1]) + # The expect handler should short circuit the usual get method by + # returning false here, so get_called should be false + self.assertFalse(self.handler.get_called) + self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) + self.handler = usual_handler # Restore to avoid breaking any subsequent tests. + + def test_request_length(self): + # Issue #10714: huge request lines are discarded, to avoid Denial + # of Service attacks. + result = self.send_typical_request(b'GET ' + b'x' * 65537) + self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') + self.assertFalse(self.handler.get_called) + + def test_header_length(self): + # Issue #6791: same for headers + result = self.send_typical_request( + b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') + self.assertEqual(result[0], b'HTTP/1.1 400 Line too long\r\n') + self.assertFalse(self.handler.get_called) + class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): """ Test url parsing """ def setUp(self): @@ -515,14 +623,15 @@ class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): def test_main(verbose=None): + cwd = os.getcwd() try: - cwd = os.getcwd() - support.run_unittest(BaseHTTPRequestHandlerTestCase, - SimpleHTTPRequestHandlerTestCase, - BaseHTTPServerTestCase, - SimpleHTTPServerTestCase, - CGIHTTPServerTestCase - ) + support.run_unittest( + BaseHTTPRequestHandlerTestCase, + BaseHTTPServerTestCase, + SimpleHTTPServerTestCase, + CGIHTTPServerTestCase, + SimpleHTTPRequestHandlerTestCase, + ) finally: os.chdir(cwd) |