summaryrefslogtreecommitdiff
path: root/Lib/test/test_httpservers.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_httpservers.py')
-rw-r--r--Lib/test/test_httpservers.py359
1 files changed, 234 insertions, 125 deletions
diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py
index 28256975a0..e83c048ed9 100644
--- a/Lib/test/test_httpservers.py
+++ b/Lib/test/test_httpservers.py
@@ -16,12 +16,11 @@ import shutil
import urllib.parse
import http.client
import tempfile
-import threading
+from io import BytesIO
import unittest
-
-from io import BytesIO
from test import support
+threading = support.import_module('threading')
class NoLogRequestHandler:
def log_message(self, *args):
@@ -32,21 +31,6 @@ class NoLogRequestHandler:
return ''
-class SocketlessRequestHandler(SimpleHTTPRequestHandler):
- def __init__(self):
- self.get_called = False
- self.protocol_version = "HTTP/1.1"
-
- def do_GET(self):
- self.get_called = True
- self.send_response(200)
- self.send_header('Content-Type', 'text/html')
- self.end_headers()
- self.wfile.write(b'<html><body>Data</body></html>\r\n')
-
- def log_message(self, format, *args):
- pass
-
class TestServerThread(threading.Thread):
def __init__(self, test_object, request_handler):
threading.Thread.__init__(self)
@@ -54,8 +38,8 @@ class TestServerThread(threading.Thread):
self.test_object = test_object
def run(self):
- self.server = HTTPServer(('', 0), self.request_handler)
- self.test_object.PORT = self.server.socket.getsockname()[1]
+ self.server = HTTPServer(('localhost', 0), self.request_handler)
+ self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
self.test_object.server_started.set()
self.test_object = None
try:
@@ -69,6 +53,8 @@ class TestServerThread(threading.Thread):
class BaseTestCase(unittest.TestCase):
def setUp(self):
+ self._threads = support.threading_setup()
+ os.environ = support.EnvironmentVarGuard()
self.server_started = threading.Event()
self.thread = TestServerThread(self, self.request_handler)
self.thread.start()
@@ -76,81 +62,14 @@ class BaseTestCase(unittest.TestCase):
def tearDown(self):
self.thread.stop()
+ os.environ.__exit__()
+ support.threading_cleanup(*self._threads)
def request(self, uri, method='GET', body=None, headers={}):
- self.connection = http.client.HTTPConnection('localhost', self.PORT)
+ self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
self.connection.request(method, uri, body, headers)
return self.connection.getresponse()
-class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
- """Test the functionality of the BaseHTTPServer."""
-
- HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
-
- def setUp (self):
- self.handler = SocketlessRequestHandler()
-
- def send_typical_request(self, message):
- input = BytesIO(message)
- output = BytesIO()
- self.handler.rfile = input
- self.handler.wfile = output
- self.handler.handle_one_request()
- output.seek(0)
- return output.readlines()
-
- def verify_get_called(self):
- self.assertTrue(self.handler.get_called)
-
- def verify_expected_headers(self, headers):
- for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
- self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
-
- def verify_http_server_response(self, response):
- match = self.HTTPResponseMatch.search(response)
- self.assertTrue(match is not None)
-
- def test_http_1_1(self):
- result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
- self.verify_http_server_response(result[0])
- self.verify_expected_headers(result[1:-1])
- self.verify_get_called()
- self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
-
- def test_http_1_0(self):
- result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
- self.verify_http_server_response(result[0])
- self.verify_expected_headers(result[1:-1])
- self.verify_get_called()
- self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
-
- def test_http_0_9(self):
- result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
- self.assertEqual(len(result), 1)
- self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
- self.verify_get_called()
-
- def test_with_continue_1_0(self):
- result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
- self.verify_http_server_response(result[0])
- self.verify_expected_headers(result[1:-1])
- self.verify_get_called()
- self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
-
- def test_request_length(self):
- # Issue #10714: huge request lines are discarded, to avoid Denial
- # of Service attacks.
- result = self.send_typical_request(b'GET ' + b'x' * 65537)
- self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
- self.assertFalse(self.handler.get_called)
-
- def test_header_length(self):
- # Issue #6791: same for headers
- result = self.send_typical_request(
- b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
- self.assertEqual(result[0], b'HTTP/1.1 400 Line too long\r\n')
- self.assertFalse(self.handler.get_called)
-
class BaseHTTPServerTestCase(BaseTestCase):
class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
@@ -178,9 +97,17 @@ class BaseHTTPServerTestCase(BaseTestCase):
self.send_header('Connection', 'close')
self.end_headers()
+ def do_LATINONEHEADER(self):
+ self.send_response(999)
+ self.send_header('X-Special', 'Dängerous Mind')
+ self.send_header('Connection', 'close')
+ self.end_headers()
+ body = self.headers['x-special-incoming'].encode('utf-8')
+ self.wfile.write(body)
+
def setUp(self):
BaseTestCase.setUp(self)
- self.con = http.client.HTTPConnection('localhost', self.PORT)
+ self.con = http.client.HTTPConnection(self.HOST, self.PORT)
self.con.connect()
def test_command(self):
@@ -263,6 +190,7 @@ class BaseHTTPServerTestCase(BaseTestCase):
res = self.con.getresponse()
self.assertEqual(res.getheader('Connection'), 'keep-alive')
self.con.request('TEST', '/')
+ self.addCleanup(self.con.close)
def test_internal_key_error(self):
self.con.request('KEYERROR', '/')
@@ -274,6 +202,14 @@ class BaseHTTPServerTestCase(BaseTestCase):
res = self.con.getresponse()
self.assertEqual(res.status, 999)
+ def test_latin1_header(self):
+ self.con.request('LATINONEHEADER', '/', headers={
+ 'X-Special-Incoming': 'Ärger mit Unicode'
+ })
+ res = self.con.getresponse()
+ self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
+ self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
+
class SimpleHTTPServerTestCase(BaseTestCase):
class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
@@ -287,9 +223,8 @@ class SimpleHTTPServerTestCase(BaseTestCase):
self.data = b'We are the knights who say Ni!'
self.tempdir = tempfile.mkdtemp(dir=basetempdir)
self.tempdir_name = os.path.basename(self.tempdir)
- temp = open(os.path.join(self.tempdir, 'test'), 'wb')
- temp.write(self.data)
- temp.close()
+ with open(os.path.join(self.tempdir, 'test'), 'wb') as temp:
+ temp.write(self.data)
def tearDown(self):
try:
@@ -305,7 +240,7 @@ class SimpleHTTPServerTestCase(BaseTestCase):
body = response.read()
self.assertTrue(response)
self.assertEqual(response.status, status)
- self.assertTrue(response.reason != None)
+ self.assertIsNotNone(response.reason)
if data:
self.assertEqual(data, body)
@@ -321,15 +256,16 @@ class SimpleHTTPServerTestCase(BaseTestCase):
self.check_status_and_reason(response, 404)
response = self.request('/' + 'ThisDoesNotExist' + '/')
self.check_status_and_reason(response, 404)
- f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
- response = self.request('/' + self.tempdir_name + '/')
- self.check_status_and_reason(response, 200)
- if os.name == 'posix':
- # chmod won't work as expected on Windows platforms
- os.chmod(self.tempdir, 0)
- response = self.request(self.tempdir_name + '/')
- self.check_status_and_reason(response, 404)
- os.chmod(self.tempdir, 0o755)
+ with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as f:
+ response = self.request('/' + self.tempdir_name + '/')
+ self.check_status_and_reason(response, 200)
+ # chmod() doesn't work as expected on Windows, and filesystem
+ # permissions are ignored by root on Unix.
+ if os.name == 'posix' and os.geteuid() != 0:
+ os.chmod(self.tempdir, 0)
+ response = self.request(self.tempdir_name + '/')
+ self.check_status_and_reason(response, 404)
+ os.chmod(self.tempdir, 0o755)
def test_head(self):
response = self.request(
@@ -366,39 +302,54 @@ print("Content-type: text/html")
print()
form = cgi.FieldStorage()
-print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
- form.getfirst("bacon")))
+print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
+ form.getfirst("bacon")))
"""
+
+@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
+ "This test can't be run reliably as root (issue #13308).")
class CGIHTTPServerTestCase(BaseTestCase):
class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
pass
def setUp(self):
BaseTestCase.setUp(self)
+ self.cwd = os.getcwd()
self.parent_dir = tempfile.mkdtemp()
self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
os.mkdir(self.cgi_dir)
+ self.file1_path = None
+ self.file2_path = None
# The shebang line should be pure ASCII: use symlink if possible.
# See issue #7668.
- if hasattr(os, 'symlink'):
+ if support.can_symlink():
self.pythonexe = os.path.join(self.parent_dir, 'python')
os.symlink(sys.executable, self.pythonexe)
else:
self.pythonexe = sys.executable
+ try:
+ # The python executable path is written as the first line of the
+ # CGI Python script. The encoding cookie cannot be used, and so the
+ # path should be encodable to the default script encoding (utf-8)
+ self.pythonexe.encode('utf-8')
+ except UnicodeEncodeError:
+ self.tearDown()
+ raise self.skipTest(
+ "Python executable path is not encodable to utf-8")
+
self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
- with open(self.file1_path, 'w') as file1:
+ with open(self.file1_path, 'w', encoding='utf-8') as file1:
file1.write(cgi_file1 % self.pythonexe)
os.chmod(self.file1_path, 0o777)
self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
- with open(self.file2_path, 'w') as file2:
+ with open(self.file2_path, 'w', encoding='utf-8') as file2:
file2.write(cgi_file2 % self.pythonexe)
os.chmod(self.file2_path, 0o777)
- self.cwd = os.getcwd()
os.chdir(self.parent_dir)
def tearDown(self):
@@ -406,8 +357,10 @@ class CGIHTTPServerTestCase(BaseTestCase):
os.chdir(self.cwd)
if self.pythonexe != sys.executable:
os.remove(self.pythonexe)
- os.remove(self.file1_path)
- os.remove(self.file2_path)
+ if self.file1_path:
+ os.remove(self.file1_path)
+ if self.file2_path:
+ os.remove(self.file2_path)
os.rmdir(self.cgi_dir)
os.rmdir(self.parent_dir)
finally:
@@ -449,13 +402,13 @@ class CGIHTTPServerTestCase(BaseTestCase):
else:
actual = server._url_collapse_path_split(path)
self.assertEqual(expected, actual,
- msg='path = %r\nGot: %r\nWanted: %r' % (
- path, actual, expected))
+ msg='path = %r\nGot: %r\nWanted: %r' %
+ (path, actual, expected))
def test_headers_and_content(self):
res = self.request('/cgi-bin/file1.py')
- self.assertEqual((b'Hello World\n', 'text/html', 200), \
- (res.read(), res.getheader('Content-type'), res.status))
+ self.assertEqual((b'Hello World\n', 'text/html', 200),
+ (res.read(), res.getheader('Content-type'), res.status))
def test_post(self):
params = urllib.parse.urlencode(
@@ -474,8 +427,8 @@ class CGIHTTPServerTestCase(BaseTestCase):
headers = {b'Authorization' : b'Basic ' +
base64.b64encode(b'username:pass')}
res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
- self.assertEqual((b'Hello World\n', 'text/html', 200), \
- (res.read(), res.getheader('Content-type'), res.status))
+ self.assertEqual((b'Hello World\n', 'text/html', 200),
+ (res.read(), res.getheader('Content-type'), res.status))
def test_no_leading_slash(self):
# http://bugs.python.org/issue2254
@@ -492,6 +445,161 @@ class CGIHTTPServerTestCase(BaseTestCase):
self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
+class SocketlessRequestHandler(SimpleHTTPRequestHandler):
+ def __init__(self):
+ self.get_called = False
+ self.protocol_version = "HTTP/1.1"
+
+ def do_GET(self):
+ self.get_called = True
+ self.send_response(200)
+ self.send_header('Content-Type', 'text/html')
+ self.end_headers()
+ self.wfile.write(b'<html><body>Data</body></html>\r\n')
+
+ def log_message(self, format, *args):
+ pass
+
+class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
+ def handle_expect_100(self):
+ self.send_error(417)
+ return False
+
+class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
+ """Test the functionality of the BaseHTTPServer.
+
+ Test the support for the Expect 100-continue header.
+ """
+
+ HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
+
+ def setUp (self):
+ self.handler = SocketlessRequestHandler()
+
+ def send_typical_request(self, message):
+ input = BytesIO(message)
+ output = BytesIO()
+ self.handler.rfile = input
+ self.handler.wfile = output
+ self.handler.handle_one_request()
+ output.seek(0)
+ return output.readlines()
+
+ def verify_get_called(self):
+ self.assertTrue(self.handler.get_called)
+
+ def verify_expected_headers(self, headers):
+ for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
+ self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
+
+ def verify_http_server_response(self, response):
+ match = self.HTTPResponseMatch.search(response)
+ self.assertTrue(match is not None)
+
+ def test_http_1_1(self):
+ result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
+ self.verify_http_server_response(result[0])
+ self.verify_expected_headers(result[1:-1])
+ self.verify_get_called()
+ self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
+
+ def test_http_1_0(self):
+ result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
+ self.verify_http_server_response(result[0])
+ self.verify_expected_headers(result[1:-1])
+ self.verify_get_called()
+ self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
+
+ def test_http_0_9(self):
+ result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
+ self.assertEqual(len(result), 1)
+ self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
+ self.verify_get_called()
+
+ def test_with_continue_1_0(self):
+ result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
+ self.verify_http_server_response(result[0])
+ self.verify_expected_headers(result[1:-1])
+ self.verify_get_called()
+ self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
+
+ def test_with_continue_1_1(self):
+ result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
+ self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
+ self.assertEqual(result[1], b'HTTP/1.1 200 OK\r\n')
+ self.verify_expected_headers(result[2:-1])
+ self.verify_get_called()
+ self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
+
+ def test_header_buffering(self):
+
+ def _readAndReseek(f):
+ pos = f.tell()
+ f.seek(0)
+ data = f.read()
+ f.seek(pos)
+ return data
+
+ input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
+ output = BytesIO()
+ self.handler.rfile = input
+ self.handler.wfile = output
+ self.handler.request_version = 'HTTP/1.1'
+
+ self.handler.send_header('Foo', 'foo')
+ self.handler.send_header('bar', 'bar')
+ self.assertEqual(_readAndReseek(output), b'')
+ self.handler.end_headers()
+ self.assertEqual(_readAndReseek(output),
+ b'Foo: foo\r\nbar: bar\r\n\r\n')
+
+ def test_header_unbuffered_when_continue(self):
+
+ def _readAndReseek(f):
+ pos = f.tell()
+ f.seek(0)
+ data = f.read()
+ f.seek(pos)
+ return data
+
+ input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
+ output = BytesIO()
+ self.handler.rfile = input
+ self.handler.wfile = output
+ self.handler.request_version = 'HTTP/1.1'
+
+ self.handler.handle_one_request()
+ self.assertNotEqual(_readAndReseek(output), b'')
+ result = _readAndReseek(output).split(b'\r\n')
+ self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
+ self.assertEqual(result[1], b'HTTP/1.1 200 OK')
+
+ def test_with_continue_rejected(self):
+ usual_handler = self.handler # Save to avoid breaking any subsequent tests.
+ self.handler = RejectingSocketlessRequestHandler()
+ result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
+ self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
+ self.verify_expected_headers(result[1:-1])
+ # The expect handler should short circuit the usual get method by
+ # returning false here, so get_called should be false
+ self.assertFalse(self.handler.get_called)
+ self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
+ self.handler = usual_handler # Restore to avoid breaking any subsequent tests.
+
+ def test_request_length(self):
+ # Issue #10714: huge request lines are discarded, to avoid Denial
+ # of Service attacks.
+ result = self.send_typical_request(b'GET ' + b'x' * 65537)
+ self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
+ self.assertFalse(self.handler.get_called)
+
+ def test_header_length(self):
+ # Issue #6791: same for headers
+ result = self.send_typical_request(
+ b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
+ self.assertEqual(result[0], b'HTTP/1.1 400 Line too long\r\n')
+ self.assertFalse(self.handler.get_called)
+
class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
""" Test url parsing """
def setUp(self):
@@ -515,14 +623,15 @@ class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
def test_main(verbose=None):
+ cwd = os.getcwd()
try:
- cwd = os.getcwd()
- support.run_unittest(BaseHTTPRequestHandlerTestCase,
- SimpleHTTPRequestHandlerTestCase,
- BaseHTTPServerTestCase,
- SimpleHTTPServerTestCase,
- CGIHTTPServerTestCase
- )
+ support.run_unittest(
+ BaseHTTPRequestHandlerTestCase,
+ BaseHTTPServerTestCase,
+ SimpleHTTPServerTestCase,
+ CGIHTTPServerTestCase,
+ SimpleHTTPRequestHandlerTestCase,
+ )
finally:
os.chdir(cwd)