diff options
author | Tim Graham <timograham@gmail.com> | 2023-01-07 05:41:40 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-07 11:41:40 +0100 |
commit | 016bead6a23989adec5c7ee6948db7ce2fc5e89b (patch) | |
tree | 5eac2d287ff7a020ff44c48cc539339e028cfaeb /tests/requests_tests | |
parent | d5f892d873c6f98178121214f3c00d284ae1296d (diff) | |
download | django-016bead6a23989adec5c7ee6948db7ce2fc5e89b.tar.gz |
Renamed 'requests' test package.
This avoids a collision when third-party database backends depend on the
Requests HTTP library.
Diffstat (limited to 'tests/requests_tests')
-rw-r--r-- | tests/requests_tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/requests_tests/test_accept_header.py | 103 | ||||
-rw-r--r-- | tests/requests_tests/test_data_upload_settings.py | 217 | ||||
-rw-r--r-- | tests/requests_tests/tests.py | 1098 |
4 files changed, 1418 insertions, 0 deletions
diff --git a/tests/requests_tests/__init__.py b/tests/requests_tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/requests_tests/__init__.py diff --git a/tests/requests_tests/test_accept_header.py b/tests/requests_tests/test_accept_header.py new file mode 100644 index 0000000000..c6eed0e479 --- /dev/null +++ b/tests/requests_tests/test_accept_header.py @@ -0,0 +1,103 @@ +from unittest import TestCase + +from django.http import HttpRequest +from django.http.request import MediaType + + +class MediaTypeTests(TestCase): + def test_empty(self): + for empty_media_type in (None, ""): + with self.subTest(media_type=empty_media_type): + media_type = MediaType(empty_media_type) + self.assertIs(media_type.is_all_types, False) + self.assertEqual(str(media_type), "") + self.assertEqual(repr(media_type), "<MediaType: >") + + def test_str(self): + self.assertEqual(str(MediaType("*/*; q=0.8")), "*/*; q=0.8") + self.assertEqual(str(MediaType("application/xml")), "application/xml") + + def test_repr(self): + self.assertEqual(repr(MediaType("*/*; q=0.8")), "<MediaType: */*; q=0.8>") + self.assertEqual( + repr(MediaType("application/xml")), + "<MediaType: application/xml>", + ) + + def test_is_all_types(self): + self.assertIs(MediaType("*/*").is_all_types, True) + self.assertIs(MediaType("*/*; q=0.8").is_all_types, True) + self.assertIs(MediaType("text/*").is_all_types, False) + self.assertIs(MediaType("application/xml").is_all_types, False) + + def test_match(self): + tests = [ + ("*/*; q=0.8", "*/*"), + ("*/*", "application/json"), + (" */* ", "application/json"), + ("application/*", "application/json"), + ("application/xml", "application/xml"), + (" application/xml ", "application/xml"), + ("application/xml", " application/xml "), + ] + for accepted_type, mime_type in tests: + with self.subTest(accepted_type, mime_type=mime_type): + self.assertIs(MediaType(accepted_type).match(mime_type), True) + + def test_no_match(self): + tests = [ + (None, "*/*"), + ("", "*/*"), + ("; q=0.8", "*/*"), + ("application/xml", "application/html"), + ("application/xml", "*/*"), + ] + for accepted_type, mime_type in tests: + with self.subTest(accepted_type, mime_type=mime_type): + self.assertIs(MediaType(accepted_type).match(mime_type), False) + + +class AcceptHeaderTests(TestCase): + def test_no_headers(self): + """Absence of Accept header defaults to '*/*'.""" + request = HttpRequest() + self.assertEqual( + [str(accepted_type) for accepted_type in request.accepted_types], + ["*/*"], + ) + + def test_accept_headers(self): + request = HttpRequest() + request.META[ + "HTTP_ACCEPT" + ] = "text/html, application/xhtml+xml,application/xml ;q=0.9,*/*;q=0.8" + self.assertEqual( + [str(accepted_type) for accepted_type in request.accepted_types], + [ + "text/html", + "application/xhtml+xml", + "application/xml; q=0.9", + "*/*; q=0.8", + ], + ) + + def test_request_accepts_any(self): + request = HttpRequest() + request.META["HTTP_ACCEPT"] = "*/*" + self.assertIs(request.accepts("application/json"), True) + + def test_request_accepts_none(self): + request = HttpRequest() + request.META["HTTP_ACCEPT"] = "" + self.assertIs(request.accepts("application/json"), False) + self.assertEqual(request.accepted_types, []) + + def test_request_accepts_some(self): + request = HttpRequest() + request.META[ + "HTTP_ACCEPT" + ] = "text/html,application/xhtml+xml,application/xml;q=0.9" + self.assertIs(request.accepts("text/html"), True) + self.assertIs(request.accepts("application/xhtml+xml"), True) + self.assertIs(request.accepts("application/xml"), True) + self.assertIs(request.accepts("application/json"), False) diff --git a/tests/requests_tests/test_data_upload_settings.py b/tests/requests_tests/test_data_upload_settings.py new file mode 100644 index 0000000000..0199296293 --- /dev/null +++ b/tests/requests_tests/test_data_upload_settings.py @@ -0,0 +1,217 @@ +from io import BytesIO + +from django.core.exceptions import RequestDataTooBig, TooManyFieldsSent +from django.core.handlers.wsgi import WSGIRequest +from django.test import SimpleTestCase +from django.test.client import FakePayload + +TOO_MANY_FIELDS_MSG = ( + "The number of GET/POST parameters exceeded settings.DATA_UPLOAD_MAX_NUMBER_FIELDS." +) +TOO_MUCH_DATA_MSG = "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE." + + +class DataUploadMaxMemorySizeFormPostTests(SimpleTestCase): + def setUp(self): + payload = FakePayload("a=1&a=2&a=3\r\n") + self.request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + + def test_size_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=12): + with self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG): + self.request._load_post_and_files() + + def test_size_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=13): + self.request._load_post_and_files() + + def test_no_limit(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=None): + self.request._load_post_and_files() + + +class DataUploadMaxMemorySizeMultipartPostTests(SimpleTestCase): + def setUp(self): + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name"', + "", + "value", + "--boundary--", + ] + ) + ) + self.request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + + def test_size_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=10): + with self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG): + self.request._load_post_and_files() + + def test_size_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=11): + self.request._load_post_and_files() + + def test_no_limit(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=None): + self.request._load_post_and_files() + + def test_file_passes(self): + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="file1"; ' + 'filename="test.file"', + "", + "value", + "--boundary--", + ] + ) + ) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=1): + request._load_post_and_files() + self.assertIn("file1", request.FILES, "Upload file not present") + + +class DataUploadMaxMemorySizeGetTests(SimpleTestCase): + def setUp(self): + self.request = WSGIRequest( + { + "REQUEST_METHOD": "GET", + "wsgi.input": BytesIO(b""), + "CONTENT_LENGTH": 3, + } + ) + + def test_data_upload_max_memory_size_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=2): + with self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG): + self.request.body + + def test_size_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=3): + self.request.body + + def test_no_limit(self): + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=None): + self.request.body + + def test_empty_content_length(self): + self.request.environ["CONTENT_LENGTH"] = "" + self.request.body + + +class DataUploadMaxNumberOfFieldsGet(SimpleTestCase): + def test_get_max_fields_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=1): + with self.assertRaisesMessage(TooManyFieldsSent, TOO_MANY_FIELDS_MSG): + request = WSGIRequest( + { + "REQUEST_METHOD": "GET", + "wsgi.input": BytesIO(b""), + "QUERY_STRING": "a=1&a=2&a=3", + } + ) + request.GET["a"] + + def test_get_max_fields_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=3): + request = WSGIRequest( + { + "REQUEST_METHOD": "GET", + "wsgi.input": BytesIO(b""), + "QUERY_STRING": "a=1&a=2&a=3", + } + ) + request.GET["a"] + + +class DataUploadMaxNumberOfFieldsMultipartPost(SimpleTestCase): + def setUp(self): + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name1"', + "", + "value1", + "--boundary", + 'Content-Disposition: form-data; name="name2"', + "", + "value2", + "--boundary--", + ] + ) + ) + self.request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + + def test_number_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=1): + with self.assertRaisesMessage(TooManyFieldsSent, TOO_MANY_FIELDS_MSG): + self.request._load_post_and_files() + + def test_number_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=2): + self.request._load_post_and_files() + + def test_no_limit(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=None): + self.request._load_post_and_files() + + +class DataUploadMaxNumberOfFieldsFormPost(SimpleTestCase): + def setUp(self): + payload = FakePayload("\r\n".join(["a=1&a=2&a=3", ""])) + self.request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + + def test_number_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=2): + with self.assertRaisesMessage(TooManyFieldsSent, TOO_MANY_FIELDS_MSG): + self.request._load_post_and_files() + + def test_number_not_exceeded(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=3): + self.request._load_post_and_files() + + def test_no_limit(self): + with self.settings(DATA_UPLOAD_MAX_NUMBER_FIELDS=None): + self.request._load_post_and_files() diff --git a/tests/requests_tests/tests.py b/tests/requests_tests/tests.py new file mode 100644 index 0000000000..ef218afe2f --- /dev/null +++ b/tests/requests_tests/tests.py @@ -0,0 +1,1098 @@ +import pickle +from io import BytesIO +from itertools import chain +from urllib.parse import urlencode + +from django.core.exceptions import DisallowedHost +from django.core.handlers.wsgi import LimitedStream, WSGIRequest +from django.http import ( + HttpHeaders, + HttpRequest, + RawPostDataException, + UnreadablePostError, +) +from django.http.multipartparser import MultiPartParserError +from django.http.request import split_domain_port +from django.test import RequestFactory, SimpleTestCase, override_settings +from django.test.client import FakePayload + + +class RequestsTests(SimpleTestCase): + def test_httprequest(self): + request = HttpRequest() + self.assertEqual(list(request.GET), []) + self.assertEqual(list(request.POST), []) + self.assertEqual(list(request.COOKIES), []) + self.assertEqual(list(request.META), []) + + # .GET and .POST should be QueryDicts + self.assertEqual(request.GET.urlencode(), "") + self.assertEqual(request.POST.urlencode(), "") + + # and FILES should be MultiValueDict + self.assertEqual(request.FILES.getlist("foo"), []) + + self.assertIsNone(request.content_type) + self.assertIsNone(request.content_params) + + def test_httprequest_full_path(self): + request = HttpRequest() + request.path = "/;some/?awful/=path/foo:bar/" + request.path_info = "/prefix" + request.path + request.META["QUERY_STRING"] = ";some=query&+query=string" + expected = "/%3Bsome/%3Fawful/%3Dpath/foo:bar/?;some=query&+query=string" + self.assertEqual(request.get_full_path(), expected) + self.assertEqual(request.get_full_path_info(), "/prefix" + expected) + + def test_httprequest_full_path_with_query_string_and_fragment(self): + request = HttpRequest() + request.path = "/foo#bar" + request.path_info = "/prefix" + request.path + request.META["QUERY_STRING"] = "baz#quux" + self.assertEqual(request.get_full_path(), "/foo%23bar?baz#quux") + self.assertEqual(request.get_full_path_info(), "/prefix/foo%23bar?baz#quux") + + def test_httprequest_repr(self): + request = HttpRequest() + request.path = "/somepath/" + request.method = "GET" + request.GET = {"get-key": "get-value"} + request.POST = {"post-key": "post-value"} + request.COOKIES = {"post-key": "post-value"} + request.META = {"post-key": "post-value"} + self.assertEqual(repr(request), "<HttpRequest: GET '/somepath/'>") + + def test_httprequest_repr_invalid_method_and_path(self): + request = HttpRequest() + self.assertEqual(repr(request), "<HttpRequest>") + request = HttpRequest() + request.method = "GET" + self.assertEqual(repr(request), "<HttpRequest>") + request = HttpRequest() + request.path = "" + self.assertEqual(repr(request), "<HttpRequest>") + + def test_wsgirequest(self): + request = WSGIRequest( + { + "PATH_INFO": "bogus", + "REQUEST_METHOD": "bogus", + "CONTENT_TYPE": "text/html; charset=utf8", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(list(request.GET), []) + self.assertEqual(list(request.POST), []) + self.assertEqual(list(request.COOKIES), []) + self.assertEqual( + set(request.META), + { + "PATH_INFO", + "REQUEST_METHOD", + "SCRIPT_NAME", + "CONTENT_TYPE", + "wsgi.input", + }, + ) + self.assertEqual(request.META["PATH_INFO"], "bogus") + self.assertEqual(request.META["REQUEST_METHOD"], "bogus") + self.assertEqual(request.META["SCRIPT_NAME"], "") + self.assertEqual(request.content_type, "text/html") + self.assertEqual(request.content_params, {"charset": "utf8"}) + + def test_wsgirequest_with_script_name(self): + """ + The request's path is correctly assembled, regardless of whether or + not the SCRIPT_NAME has a trailing slash (#20169). + """ + # With trailing slash + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "SCRIPT_NAME": "/PREFIX/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/PREFIX/somepath/") + # Without trailing slash + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "SCRIPT_NAME": "/PREFIX", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/PREFIX/somepath/") + + def test_wsgirequest_script_url_double_slashes(self): + """ + WSGI squashes multiple successive slashes in PATH_INFO, WSGIRequest + should take that into account when populating request.path and + request.META['SCRIPT_NAME'] (#17133). + """ + request = WSGIRequest( + { + "SCRIPT_URL": "/mst/milestones//accounts/login//help", + "PATH_INFO": "/milestones/accounts/login/help", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/mst/milestones/accounts/login/help") + self.assertEqual(request.META["SCRIPT_NAME"], "/mst") + + def test_wsgirequest_with_force_script_name(self): + """ + The FORCE_SCRIPT_NAME setting takes precedence over the request's + SCRIPT_NAME environment parameter (#20169). + """ + with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX/"): + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "SCRIPT_NAME": "/PREFIX/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/FORCED_PREFIX/somepath/") + + def test_wsgirequest_path_with_force_script_name_trailing_slash(self): + """ + The request's path is correctly assembled, regardless of whether or not + the FORCE_SCRIPT_NAME setting has a trailing slash (#20169). + """ + # With trailing slash + with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX/"): + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/FORCED_PREFIX/somepath/") + # Without trailing slash + with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX"): + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/FORCED_PREFIX/somepath/") + + def test_wsgirequest_repr(self): + request = WSGIRequest({"REQUEST_METHOD": "get", "wsgi.input": BytesIO(b"")}) + self.assertEqual(repr(request), "<WSGIRequest: GET '/'>") + request = WSGIRequest( + { + "PATH_INFO": "/somepath/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + request.GET = {"get-key": "get-value"} + request.POST = {"post-key": "post-value"} + request.COOKIES = {"post-key": "post-value"} + request.META = {"post-key": "post-value"} + self.assertEqual(repr(request), "<WSGIRequest: GET '/somepath/'>") + + def test_wsgirequest_path_info(self): + def wsgi_str(path_info, encoding="utf-8"): + path_info = path_info.encode( + encoding + ) # Actual URL sent by the browser (bytestring) + path_info = path_info.decode( + "iso-8859-1" + ) # Value in the WSGI environ dict (native string) + return path_info + + # Regression for #19468 + request = WSGIRequest( + { + "PATH_INFO": wsgi_str("/سلام/"), + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + self.assertEqual(request.path, "/سلام/") + + # The URL may be incorrectly encoded in a non-UTF-8 encoding (#26971) + request = WSGIRequest( + { + "PATH_INFO": wsgi_str("/café/", encoding="iso-8859-1"), + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + } + ) + # Since it's impossible to decide the (wrong) encoding of the URL, it's + # left percent-encoded in the path. + self.assertEqual(request.path, "/caf%E9/") + + def test_limited_stream(self): + # Read all of a limited stream + stream = LimitedStream(BytesIO(b"test"), 2) + self.assertEqual(stream.read(), b"te") + # Reading again returns nothing. + self.assertEqual(stream.read(), b"") + + # Read a number of characters greater than the stream has to offer + stream = LimitedStream(BytesIO(b"test"), 2) + self.assertEqual(stream.read(5), b"te") + # Reading again returns nothing. + self.assertEqual(stream.readline(5), b"") + + # Read sequentially from a stream + stream = LimitedStream(BytesIO(b"12345678"), 8) + self.assertEqual(stream.read(5), b"12345") + self.assertEqual(stream.read(5), b"678") + # Reading again returns nothing. + self.assertEqual(stream.readline(5), b"") + + # Read lines from a stream + stream = LimitedStream(BytesIO(b"1234\n5678\nabcd\nefgh\nijkl"), 24) + # Read a full line, unconditionally + self.assertEqual(stream.readline(), b"1234\n") + # Read a number of characters less than a line + self.assertEqual(stream.readline(2), b"56") + # Read the rest of the partial line + self.assertEqual(stream.readline(), b"78\n") + # Read a full line, with a character limit greater than the line length + self.assertEqual(stream.readline(6), b"abcd\n") + # Read the next line, deliberately terminated at the line end + self.assertEqual(stream.readline(4), b"efgh") + # Read the next line... just the line end + self.assertEqual(stream.readline(), b"\n") + # Read everything else. + self.assertEqual(stream.readline(), b"ijkl") + + # Regression for #15018 + # If a stream contains a newline, but the provided length + # is less than the number of provided characters, the newline + # doesn't reset the available character count + stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9) + self.assertEqual(stream.readline(10), b"1234\n") + self.assertEqual(stream.readline(3), b"abc") + # Now expire the available characters + self.assertEqual(stream.readline(3), b"d") + # Reading again returns nothing. + self.assertEqual(stream.readline(2), b"") + + # Same test, but with read, not readline. + stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9) + self.assertEqual(stream.read(6), b"1234\na") + self.assertEqual(stream.read(2), b"bc") + self.assertEqual(stream.read(2), b"d") + self.assertEqual(stream.read(2), b"") + self.assertEqual(stream.read(), b"") + + def test_stream_read(self): + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + }, + ) + self.assertEqual(request.read(), b"name=value") + + def test_stream_readline(self): + payload = FakePayload("name=value\nother=string") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + }, + ) + self.assertEqual(request.readline(), b"name=value\n") + self.assertEqual(request.readline(), b"other=string") + + def test_read_after_value(self): + """ + Reading from request is allowed after accessing request contents as + POST or body. + """ + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {"name": ["value"]}) + self.assertEqual(request.body, b"name=value") + self.assertEqual(request.read(), b"name=value") + + def test_value_after_read(self): + """ + Construction of POST or body is not allowed after reading + from request. + """ + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(request.read(2), b"na") + with self.assertRaises(RawPostDataException): + request.body + self.assertEqual(request.POST, {}) + + def test_non_ascii_POST(self): + payload = FakePayload(urlencode({"key": "España"})) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {"key": ["España"]}) + + def test_alternate_charset_POST(self): + """ + Test a POST with non-utf-8 payload encoding. + """ + payload = FakePayload(urlencode({"key": "España".encode("latin-1")})) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": "application/x-www-form-urlencoded; charset=iso-8859-1", + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {"key": ["España"]}) + + def test_body_after_POST_multipart_form_data(self): + """ + Reading body after parsing multipart/form-data is not allowed + """ + # Because multipart is used for large amounts of data i.e. file uploads, + # we don't want the data held in memory twice, and we don't want to + # silence the error by setting body = '' either. + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name"', + "", + "value", + "--boundary--", + ] + ) + ) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {"name": ["value"]}) + with self.assertRaises(RawPostDataException): + request.body + + def test_body_after_POST_multipart_related(self): + """ + Reading body after parsing multipart that isn't form-data is allowed + """ + # Ticket #9054 + # There are cases in which the multipart data is related instead of + # being a binary upload, in which case it should still be accessible + # via body. + payload_data = b"\r\n".join( + [ + b"--boundary", + b'Content-ID: id; name="name"', + b"", + b"value", + b"--boundary--", + ] + ) + payload = FakePayload(payload_data) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/related; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {}) + self.assertEqual(request.body, payload_data) + + def test_POST_multipart_with_content_length_zero(self): + """ + Multipart POST requests with Content-Length >= 0 are valid and need to + be handled. + """ + # According to RFC 9110 Section 8.6 every POST with Content-Length >= 0 + # is a valid request, so ensure that we handle Content-Length == 0. + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name"', + "", + "value", + "--boundary--", + ] + ) + ) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": 0, + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {}) + + def test_POST_binary_only(self): + payload = b"\r\n\x01\x00\x00\x00ab\x00\x00\xcd\xcc,@" + environ = { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/octet-stream", + "CONTENT_LENGTH": len(payload), + "wsgi.input": BytesIO(payload), + } + request = WSGIRequest(environ) + self.assertEqual(request.POST, {}) + self.assertEqual(request.FILES, {}) + self.assertEqual(request.body, payload) + + # Same test without specifying content-type + environ.update({"CONTENT_TYPE": "", "wsgi.input": BytesIO(payload)}) + request = WSGIRequest(environ) + self.assertEqual(request.POST, {}) + self.assertEqual(request.FILES, {}) + self.assertEqual(request.body, payload) + + def test_read_by_lines(self): + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(list(request), [b"name=value"]) + + def test_POST_after_body_read(self): + """ + POST should be populated even if body is read first + """ + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + request.body # evaluate + self.assertEqual(request.POST, {"name": ["value"]}) + + def test_POST_after_body_read_and_stream_read(self): + """ + POST should be populated even if body is read first, and then + the stream is read second. + """ + payload = FakePayload("name=value") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + request.body # evaluate + self.assertEqual(request.read(1), b"n") + self.assertEqual(request.POST, {"name": ["value"]}) + + def test_POST_after_body_read_and_stream_read_multipart(self): + """ + POST should be populated even if body is read first, and then + the stream is read second. Using multipart/form-data instead of urlencoded. + """ + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name"', + "", + "value", + "--boundary--" "", + ] + ) + ) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + request.body # evaluate + # Consume enough data to mess up the parsing: + self.assertEqual(request.read(13), b"--boundary\r\nC") + self.assertEqual(request.POST, {"name": ["value"]}) + + def test_POST_immutable_for_multipart(self): + """ + MultiPartParser.parse() leaves request.POST immutable. + """ + payload = FakePayload( + "\r\n".join( + [ + "--boundary", + 'Content-Disposition: form-data; name="name"', + "", + "value", + "--boundary--", + ] + ) + ) + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=boundary", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertFalse(request.POST._mutable) + + def test_multipart_without_boundary(self): + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data;", + "CONTENT_LENGTH": 0, + "wsgi.input": FakePayload(), + } + ) + with self.assertRaisesMessage( + MultiPartParserError, "Invalid boundary in multipart: None" + ): + request.POST + + def test_multipart_non_ascii_content_type(self): + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary = \xe0", + "CONTENT_LENGTH": 0, + "wsgi.input": FakePayload(), + } + ) + msg = ( + "Invalid non-ASCII Content-Type in multipart: multipart/form-data; " + "boundary = à" + ) + with self.assertRaisesMessage(MultiPartParserError, msg): + request.POST + + def test_POST_connection_error(self): + """ + If wsgi.input.read() raises an exception while trying to read() the + POST, the exception is identifiable (not a generic OSError). + """ + + class ExplodingBytesIO(BytesIO): + def read(self, size=-1, /): + raise OSError("kaboom!") + + payload = b"name=value" + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": ExplodingBytesIO(payload), + } + ) + with self.assertRaises(UnreadablePostError): + request.body + + def test_set_encoding_clears_POST(self): + payload = FakePayload("name=Hello Günter") + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "application/x-www-form-urlencoded", + "CONTENT_LENGTH": len(payload), + "wsgi.input": payload, + } + ) + self.assertEqual(request.POST, {"name": ["Hello Günter"]}) + request.encoding = "iso-8859-16" + self.assertEqual(request.POST, {"name": ["Hello GĂŒnter"]}) + + def test_set_encoding_clears_GET(self): + payload = FakePayload("") + request = WSGIRequest( + { + "REQUEST_METHOD": "GET", + "wsgi.input": payload, + "QUERY_STRING": "name=Hello%20G%C3%BCnter", + } + ) + self.assertEqual(request.GET, {"name": ["Hello Günter"]}) + request.encoding = "iso-8859-16" + self.assertEqual(request.GET, {"name": ["Hello G\u0102\u0152nter"]}) + + def test_FILES_connection_error(self): + """ + If wsgi.input.read() raises an exception while trying to read() the + FILES, the exception is identifiable (not a generic OSError). + """ + + class ExplodingBytesIO(BytesIO): + def read(self, size=-1, /): + raise OSError("kaboom!") + + payload = b"x" + request = WSGIRequest( + { + "REQUEST_METHOD": "POST", + "CONTENT_TYPE": "multipart/form-data; boundary=foo_", + "CONTENT_LENGTH": len(payload), + "wsgi.input": ExplodingBytesIO(payload), + } + ) + with self.assertRaises(UnreadablePostError): + request.FILES + + def test_pickling_request(self): + request = HttpRequest() + request.method = "GET" + request.path = "/testpath/" + request.META = { + "QUERY_STRING": ";some=query&+query=string", + "SERVER_NAME": "example.com", + "SERVER_PORT": 80, + } + request.COOKIES = {"post-key": "post-value"} + dump = pickle.dumps(request) + request_from_pickle = pickle.loads(dump) + self.assertEqual(repr(request), repr(request_from_pickle)) + + +class HostValidationTests(SimpleTestCase): + poisoned_hosts = [ + "example.com@evil.tld", + "example.com:dr.frankenstein@evil.tld", + "example.com:dr.frankenstein@evil.tld:80", + "example.com:80/badpath", + "example.com: recovermypassword.com", + ] + + @override_settings( + USE_X_FORWARDED_HOST=False, + ALLOWED_HOSTS=[ + "forward.com", + "example.com", + "internal.com", + "12.34.56.78", + "[2001:19f0:feee::dead:beef:cafe]", + "xn--4ca9at.com", + ".multitenant.com", + "INSENSITIVE.com", + "[::ffff:169.254.169.254]", + ], + ) + def test_http_get_host(self): + # Check if X_FORWARDED_HOST is provided. + request = HttpRequest() + request.META = { + "HTTP_X_FORWARDED_HOST": "forward.com", + "HTTP_HOST": "example.com", + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + # X_FORWARDED_HOST is ignored. + self.assertEqual(request.get_host(), "example.com") + + # Check if X_FORWARDED_HOST isn't provided. + request = HttpRequest() + request.META = { + "HTTP_HOST": "example.com", + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + self.assertEqual(request.get_host(), "example.com") + + # Check if HTTP_HOST isn't provided. + request = HttpRequest() + request.META = { + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + self.assertEqual(request.get_host(), "internal.com") + + # Check if HTTP_HOST isn't provided, and we're on a nonstandard port + request = HttpRequest() + request.META = { + "SERVER_NAME": "internal.com", + "SERVER_PORT": 8042, + } + self.assertEqual(request.get_host(), "internal.com:8042") + + legit_hosts = [ + "example.com", + "example.com:80", + "12.34.56.78", + "12.34.56.78:443", + "[2001:19f0:feee::dead:beef:cafe]", + "[2001:19f0:feee::dead:beef:cafe]:8080", + "xn--4ca9at.com", # Punycode for öäü.com + "anything.multitenant.com", + "multitenant.com", + "insensitive.com", + "example.com.", + "example.com.:80", + "[::ffff:169.254.169.254]", + ] + + for host in legit_hosts: + request = HttpRequest() + request.META = { + "HTTP_HOST": host, + } + request.get_host() + + # Poisoned host headers are rejected as suspicious + for host in chain(self.poisoned_hosts, ["other.com", "example.com.."]): + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = { + "HTTP_HOST": host, + } + request.get_host() + + @override_settings(USE_X_FORWARDED_HOST=True, ALLOWED_HOSTS=["*"]) + def test_http_get_host_with_x_forwarded_host(self): + # Check if X_FORWARDED_HOST is provided. + request = HttpRequest() + request.META = { + "HTTP_X_FORWARDED_HOST": "forward.com", + "HTTP_HOST": "example.com", + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + # X_FORWARDED_HOST is obeyed. + self.assertEqual(request.get_host(), "forward.com") + + # Check if X_FORWARDED_HOST isn't provided. + request = HttpRequest() + request.META = { + "HTTP_HOST": "example.com", + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + self.assertEqual(request.get_host(), "example.com") + + # Check if HTTP_HOST isn't provided. + request = HttpRequest() + request.META = { + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + } + self.assertEqual(request.get_host(), "internal.com") + + # Check if HTTP_HOST isn't provided, and we're on a nonstandard port + request = HttpRequest() + request.META = { + "SERVER_NAME": "internal.com", + "SERVER_PORT": 8042, + } + self.assertEqual(request.get_host(), "internal.com:8042") + + # Poisoned host headers are rejected as suspicious + legit_hosts = [ + "example.com", + "example.com:80", + "12.34.56.78", + "12.34.56.78:443", + "[2001:19f0:feee::dead:beef:cafe]", + "[2001:19f0:feee::dead:beef:cafe]:8080", + "xn--4ca9at.com", # Punycode for öäü.com + ] + + for host in legit_hosts: + request = HttpRequest() + request.META = { + "HTTP_HOST": host, + } + request.get_host() + + for host in self.poisoned_hosts: + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = { + "HTTP_HOST": host, + } + request.get_host() + + @override_settings(USE_X_FORWARDED_PORT=False) + def test_get_port(self): + request = HttpRequest() + request.META = { + "SERVER_PORT": "8080", + "HTTP_X_FORWARDED_PORT": "80", + } + # Shouldn't use the X-Forwarded-Port header + self.assertEqual(request.get_port(), "8080") + + request = HttpRequest() + request.META = { + "SERVER_PORT": "8080", + } + self.assertEqual(request.get_port(), "8080") + + @override_settings(USE_X_FORWARDED_PORT=True) + def test_get_port_with_x_forwarded_port(self): + request = HttpRequest() + request.META = { + "SERVER_PORT": "8080", + "HTTP_X_FORWARDED_PORT": "80", + } + # Should use the X-Forwarded-Port header + self.assertEqual(request.get_port(), "80") + + request = HttpRequest() + request.META = { + "SERVER_PORT": "8080", + } + self.assertEqual(request.get_port(), "8080") + + @override_settings(DEBUG=True, ALLOWED_HOSTS=[]) + def test_host_validation_in_debug_mode(self): + """ + If ALLOWED_HOSTS is empty and DEBUG is True, variants of localhost are + allowed. + """ + valid_hosts = ["localhost", "subdomain.localhost", "127.0.0.1", "[::1]"] + for host in valid_hosts: + request = HttpRequest() + request.META = {"HTTP_HOST": host} + self.assertEqual(request.get_host(), host) + + # Other hostnames raise a DisallowedHost. + with self.assertRaises(DisallowedHost): + request = HttpRequest() + request.META = {"HTTP_HOST": "example.com"} + request.get_host() + + @override_settings(ALLOWED_HOSTS=[]) + def test_get_host_suggestion_of_allowed_host(self): + """ + get_host() makes helpful suggestions if a valid-looking host is not in + ALLOWED_HOSTS. + """ + msg_invalid_host = "Invalid HTTP_HOST header: %r." + msg_suggestion = msg_invalid_host + " You may need to add %r to ALLOWED_HOSTS." + msg_suggestion2 = ( + msg_invalid_host + + " The domain name provided is not valid according to RFC 1034/1035" + ) + + for host in [ # Valid-looking hosts + "example.com", + "12.34.56.78", + "[2001:19f0:feee::dead:beef:cafe]", + "xn--4ca9at.com", # Punycode for öäü.com + ]: + request = HttpRequest() + request.META = {"HTTP_HOST": host} + with self.assertRaisesMessage( + DisallowedHost, msg_suggestion % (host, host) + ): + request.get_host() + + for domain, port in [ # Valid-looking hosts with a port number + ("example.com", 80), + ("12.34.56.78", 443), + ("[2001:19f0:feee::dead:beef:cafe]", 8080), + ]: + host = "%s:%s" % (domain, port) + request = HttpRequest() + request.META = {"HTTP_HOST": host} + with self.assertRaisesMessage( + DisallowedHost, msg_suggestion % (host, domain) + ): + request.get_host() + + for host in self.poisoned_hosts: + request = HttpRequest() + request.META = {"HTTP_HOST": host} + with self.assertRaisesMessage(DisallowedHost, msg_invalid_host % host): + request.get_host() + + request = HttpRequest() + request.META = {"HTTP_HOST": "invalid_hostname.com"} + with self.assertRaisesMessage( + DisallowedHost, msg_suggestion2 % "invalid_hostname.com" + ): + request.get_host() + + def test_split_domain_port_removes_trailing_dot(self): + domain, port = split_domain_port("example.com.:8080") + self.assertEqual(domain, "example.com") + self.assertEqual(port, "8080") + + +class BuildAbsoluteURITests(SimpleTestCase): + factory = RequestFactory() + + def test_absolute_url(self): + request = HttpRequest() + url = "https://www.example.com/asdf" + self.assertEqual(request.build_absolute_uri(location=url), url) + + def test_host_retrieval(self): + request = HttpRequest() + request.get_host = lambda: "www.example.com" + request.path = "" + self.assertEqual( + request.build_absolute_uri(location="/path/with:colons"), + "http://www.example.com/path/with:colons", + ) + + def test_request_path_begins_with_two_slashes(self): + # //// creates a request with a path beginning with // + request = self.factory.get("////absolute-uri") + tests = ( + # location isn't provided + (None, "http://testserver//absolute-uri"), + # An absolute URL + ("http://example.com/?foo=bar", "http://example.com/?foo=bar"), + # A schema-relative URL + ("//example.com/?foo=bar", "http://example.com/?foo=bar"), + # Relative URLs + ("/foo/bar/", "http://testserver/foo/bar/"), + ("/foo/./bar/", "http://testserver/foo/bar/"), + ("/foo/../bar/", "http://testserver/bar/"), + ("///foo/bar/", "http://testserver/foo/bar/"), + ) + for location, expected_url in tests: + with self.subTest(location=location): + self.assertEqual( + request.build_absolute_uri(location=location), expected_url + ) + + +class RequestHeadersTests(SimpleTestCase): + ENVIRON = { + # Non-headers are ignored. + "PATH_INFO": "/somepath/", + "REQUEST_METHOD": "get", + "wsgi.input": BytesIO(b""), + "SERVER_NAME": "internal.com", + "SERVER_PORT": 80, + # These non-HTTP prefixed headers are included. + "CONTENT_TYPE": "text/html", + "CONTENT_LENGTH": "100", + # All HTTP-prefixed headers are included. + "HTTP_ACCEPT": "*", + "HTTP_HOST": "example.com", + "HTTP_USER_AGENT": "python-requests/1.2.0", + } + + def test_base_request_headers(self): + request = HttpRequest() + request.META = self.ENVIRON + self.assertEqual( + dict(request.headers), + { + "Content-Type": "text/html", + "Content-Length": "100", + "Accept": "*", + "Host": "example.com", + "User-Agent": "python-requests/1.2.0", + }, + ) + + def test_wsgi_request_headers(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual( + dict(request.headers), + { + "Content-Type": "text/html", + "Content-Length": "100", + "Accept": "*", + "Host": "example.com", + "User-Agent": "python-requests/1.2.0", + }, + ) + + def test_wsgi_request_headers_getitem(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual(request.headers["User-Agent"], "python-requests/1.2.0") + self.assertEqual(request.headers["user-agent"], "python-requests/1.2.0") + self.assertEqual(request.headers["user_agent"], "python-requests/1.2.0") + self.assertEqual(request.headers["Content-Type"], "text/html") + self.assertEqual(request.headers["Content-Length"], "100") + + def test_wsgi_request_headers_get(self): + request = WSGIRequest(self.ENVIRON) + self.assertEqual(request.headers.get("User-Agent"), "python-requests/1.2.0") + self.assertEqual(request.headers.get("user-agent"), "python-requests/1.2.0") + self.assertEqual(request.headers.get("Content-Type"), "text/html") + self.assertEqual(request.headers.get("Content-Length"), "100") + + +class HttpHeadersTests(SimpleTestCase): + def test_basic(self): + environ = { + "CONTENT_TYPE": "text/html", + "CONTENT_LENGTH": "100", + "HTTP_HOST": "example.com", + } + headers = HttpHeaders(environ) + self.assertEqual(sorted(headers), ["Content-Length", "Content-Type", "Host"]) + self.assertEqual( + headers, + { + "Content-Type": "text/html", + "Content-Length": "100", + "Host": "example.com", + }, + ) + + def test_parse_header_name(self): + tests = ( + ("PATH_INFO", None), + ("HTTP_ACCEPT", "Accept"), + ("HTTP_USER_AGENT", "User-Agent"), + ("HTTP_X_FORWARDED_PROTO", "X-Forwarded-Proto"), + ("CONTENT_TYPE", "Content-Type"), + ("CONTENT_LENGTH", "Content-Length"), + ) + for header, expected in tests: + with self.subTest(header=header): + self.assertEqual(HttpHeaders.parse_header_name(header), expected) |