diff options
author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/file_uploads | |
parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
download | django-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/file_uploads')
-rw-r--r-- | tests/file_uploads/models.py | 2 | ||||
-rw-r--r-- | tests/file_uploads/tests.py | 873 | ||||
-rw-r--r-- | tests/file_uploads/uploadhandler.py | 26 | ||||
-rw-r--r-- | tests/file_uploads/urls.py | 30 | ||||
-rw-r--r-- | tests/file_uploads/views.py | 42 |
5 files changed, 559 insertions, 414 deletions
diff --git a/tests/file_uploads/models.py b/tests/file_uploads/models.py index 3336cc9d90..7ec1900f50 100644 --- a/tests/file_uploads/models.py +++ b/tests/file_uploads/models.py @@ -2,4 +2,4 @@ from django.db import models class FileModel(models.Model): - testfile = models.FileField(upload_to='test_upload') + testfile = models.FileField(upload_to="test_upload") diff --git a/tests/file_uploads/tests.py b/tests/file_uploads/tests.py index 78898e2b1e..37f9559eef 100644 --- a/tests/file_uploads/tests.py +++ b/tests/file_uploads/tests.py @@ -13,47 +13,52 @@ from django.core.exceptions import SuspiciousFileOperation from django.core.files import temp as tempfile from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile from django.http.multipartparser import ( - FILE, MultiPartParser, MultiPartParserError, Parser, parse_header, + FILE, + MultiPartParser, + MultiPartParserError, + Parser, + parse_header, ) from django.test import SimpleTestCase, TestCase, client, override_settings from . import uploadhandler from .models import FileModel -UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg' +UNICODE_FILENAME = "test-0123456789_中文_Orléans.jpg" MEDIA_ROOT = sys_tempfile.mkdtemp() -UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload') +UPLOAD_TO = os.path.join(MEDIA_ROOT, "test_upload") CANDIDATE_TRAVERSAL_FILE_NAMES = [ - '/tmp/hax0rd.txt', # Absolute path, *nix-style. - 'C:\\Windows\\hax0rd.txt', # Absolute path, win-style. - 'C:/Windows/hax0rd.txt', # Absolute path, broken-style. - '\\tmp\\hax0rd.txt', # Absolute path, broken in a different way. - '/tmp\\hax0rd.txt', # Absolute path, broken by mixing. - 'subdir/hax0rd.txt', # Descendant path, *nix-style. - 'subdir\\hax0rd.txt', # Descendant path, win-style. - 'sub/dir\\hax0rd.txt', # Descendant path, mixed. - '../../hax0rd.txt', # Relative path, *nix-style. - '..\\..\\hax0rd.txt', # Relative path, win-style. - '../..\\hax0rd.txt', # Relative path, mixed. - '../hax0rd.txt', # HTML entities. - '../hax0rd.txt', # HTML entities. + "/tmp/hax0rd.txt", # Absolute path, *nix-style. + "C:\\Windows\\hax0rd.txt", # Absolute path, win-style. + "C:/Windows/hax0rd.txt", # Absolute path, broken-style. + "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way. + "/tmp\\hax0rd.txt", # Absolute path, broken by mixing. + "subdir/hax0rd.txt", # Descendant path, *nix-style. + "subdir\\hax0rd.txt", # Descendant path, win-style. + "sub/dir\\hax0rd.txt", # Descendant path, mixed. + "../../hax0rd.txt", # Relative path, *nix-style. + "..\\..\\hax0rd.txt", # Relative path, win-style. + "../..\\hax0rd.txt", # Relative path, mixed. + "../hax0rd.txt", # HTML entities. + "../hax0rd.txt", # HTML entities. ] CANDIDATE_INVALID_FILE_NAMES = [ - '/tmp/', # Directory, *nix-style. - 'c:\\tmp\\', # Directory, win-style. - '/tmp/.', # Directory dot, *nix-style. - 'c:\\tmp\\.', # Directory dot, *nix-style. - '/tmp/..', # Parent directory, *nix-style. - 'c:\\tmp\\..', # Parent directory, win-style. - '', # Empty filename. + "/tmp/", # Directory, *nix-style. + "c:\\tmp\\", # Directory, win-style. + "/tmp/.", # Directory dot, *nix-style. + "c:\\tmp\\.", # Directory dot, *nix-style. + "/tmp/..", # Parent directory, *nix-style. + "c:\\tmp\\..", # Parent directory, win-style. + "", # Empty filename. ] -@override_settings(MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF='file_uploads.urls', MIDDLEWARE=[]) +@override_settings( + MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF="file_uploads.urls", MIDDLEWARE=[] +) class FileUploadTests(TestCase): - @classmethod def setUpClass(cls): super().setUpClass() @@ -62,73 +67,84 @@ class FileUploadTests(TestCase): def test_upload_name_is_validated(self): candidates = [ - '/tmp/', - '/tmp/..', - '/tmp/.', + "/tmp/", + "/tmp/..", + "/tmp/.", ] - if sys.platform == 'win32': - candidates.extend([ - 'c:\\tmp\\', - 'c:\\tmp\\..', - 'c:\\tmp\\.', - ]) + if sys.platform == "win32": + candidates.extend( + [ + "c:\\tmp\\", + "c:\\tmp\\..", + "c:\\tmp\\.", + ] + ) for file_name in candidates: with self.subTest(file_name=file_name): self.assertRaises(SuspiciousFileOperation, UploadedFile, name=file_name) def test_simple_upload(self): - with open(__file__, 'rb') as fp: + with open(__file__, "rb") as fp: post_data = { - 'name': 'Ringo', - 'file_field': fp, + "name": "Ringo", + "file_field": fp, } - response = self.client.post('/upload/', post_data) + response = self.client.post("/upload/", post_data) self.assertEqual(response.status_code, 200) def test_large_upload(self): file = tempfile.NamedTemporaryFile with file(suffix=".file1") as file1, file(suffix=".file2") as file2: - file1.write(b'a' * (2 ** 21)) + file1.write(b"a" * (2**21)) file1.seek(0) - file2.write(b'a' * (10 * 2 ** 20)) + file2.write(b"a" * (10 * 2**20)) file2.seek(0) post_data = { - 'name': 'Ringo', - 'file_field1': file1, - 'file_field2': file2, + "name": "Ringo", + "file_field1": file1, + "file_field2": file2, } for key in list(post_data): try: - post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest() + post_data[key + "_hash"] = hashlib.sha1( + post_data[key].read() + ).hexdigest() post_data[key].seek(0) except AttributeError: - post_data[key + '_hash'] = hashlib.sha1(post_data[key].encode()).hexdigest() + post_data[key + "_hash"] = hashlib.sha1( + post_data[key].encode() + ).hexdigest() - response = self.client.post('/verify/', post_data) + response = self.client.post("/verify/", post_data) self.assertEqual(response.status_code, 200) def _test_base64_upload(self, content, encode=base64.b64encode): - payload = client.FakePayload("\r\n".join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file"; filename="test.txt"', - 'Content-Type: application/octet-stream', - 'Content-Transfer-Encoding: base64', - ''])) - payload.write(b'\r\n' + encode(content.encode()) + b'\r\n') - payload.write('--' + client.BOUNDARY + '--\r\n') + payload = client.FakePayload( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file"; filename="test.txt"', + "Content-Type: application/octet-stream", + "Content-Transfer-Encoding: base64", + "", + ] + ) + ) + payload.write(b"\r\n" + encode(content.encode()) + b"\r\n") + payload.write("--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': "/echo_content/", - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo_content/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) - self.assertEqual(response.json()['file'], content) + self.assertEqual(response.json()["file"], content) def test_base64_upload(self): self._test_base64_upload("This data will be transmitted base64-encoded.") @@ -140,32 +156,36 @@ class FileUploadTests(TestCase): self._test_base64_upload("Big data" * 68000, encode=base64.encodebytes) def test_base64_invalid_upload(self): - payload = client.FakePayload('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file"; filename="test.txt"', - 'Content-Type: application/octet-stream', - 'Content-Transfer-Encoding: base64', - '' - ])) - payload.write(b'\r\n!\r\n') - payload.write('--' + client.BOUNDARY + '--\r\n') + payload = client.FakePayload( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file"; filename="test.txt"', + "Content-Type: application/octet-stream", + "Content-Transfer-Encoding: base64", + "", + ] + ) + ) + payload.write(b"\r\n!\r\n") + payload.write("--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/echo_content/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo_content/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) - self.assertEqual(response.json()['file'], '') + self.assertEqual(response.json()["file"], "") def test_unicode_file_name(self): with sys_tempfile.TemporaryDirectory() as temp_dir: # This file contains Chinese symbols and an accented char in the name. - with open(os.path.join(temp_dir, UNICODE_FILENAME), 'w+b') as file1: - file1.write(b'b' * (2 ** 10)) + with open(os.path.join(temp_dir, UNICODE_FILENAME), "w+b") as file1: + file1.write(b"b" * (2**10)) file1.seek(0) - response = self.client.post('/unicode_name/', {'file_unicode': file1}) + response = self.client.post("/unicode_name/", {"file_unicode": file1}) self.assertEqual(response.status_code, 200) def test_unicode_file_name_rfc2231(self): @@ -174,21 +194,26 @@ class FileUploadTests(TestCase): (#22971). """ payload = client.FakePayload() - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file_unicode"; filename*=UTF-8\'\'%s' % quote(UNICODE_FILENAME), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n', - '\r\n--' + client.BOUNDARY + '--\r\n' - ])) + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + "Content-Disposition: form-data; name=\"file_unicode\"; filename*=UTF-8''%s" + % quote(UNICODE_FILENAME), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + "\r\n--" + client.BOUNDARY + "--\r\n", + ] + ) + ) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': "/unicode_name/", - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/unicode_name/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) self.assertEqual(response.status_code, 200) @@ -200,66 +225,75 @@ class FileUploadTests(TestCase): """ payload = client.FakePayload() payload.write( - '\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name*=UTF-8\'\'file_unicode; filename*=UTF-8\'\'%s' % quote( - UNICODE_FILENAME - ), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n', - '\r\n--' + client.BOUNDARY + '--\r\n' - ]) + "\r\n".join( + [ + "--" + client.BOUNDARY, + "Content-Disposition: form-data; name*=UTF-8''file_unicode; filename*=UTF-8''%s" + % quote(UNICODE_FILENAME), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + "\r\n--" + client.BOUNDARY + "--\r\n", + ] + ) ) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': "/unicode_name/", - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/unicode_name/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) self.assertEqual(response.status_code, 200) def test_unicode_file_name_rfc2231_with_double_quotes(self): payload = client.FakePayload() - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file_unicode"; ' - 'filename*="UTF-8\'\'%s"' % quote(UNICODE_FILENAME), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n', - '\r\n--' + client.BOUNDARY + '--\r\n', - ])) + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file_unicode"; ' + "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + "\r\n--" + client.BOUNDARY + "--\r\n", + ] + ) + ) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/unicode_name/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/unicode_name/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) self.assertEqual(response.status_code, 200) def test_unicode_name_rfc2231_with_double_quotes(self): payload = client.FakePayload() - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name*="UTF-8\'\'file_unicode"; ' - 'filename*="UTF-8\'\'%s"' % quote(UNICODE_FILENAME), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n', - '\r\n--' + client.BOUNDARY + '--\r\n' - ])) + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + "Content-Disposition: form-data; name*=\"UTF-8''file_unicode\"; " + "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + "\r\n--" + client.BOUNDARY + "--\r\n", + ] + ) + ) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/unicode_name/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/unicode_name/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) self.assertEqual(response.status_code, 200) @@ -270,30 +304,35 @@ class FileUploadTests(TestCase): sanitization) should be okay. """ filenames = [ - '', + "", # Normalized by MultiPartParser.IE_sanitize(). - 'C:\\Windows\\', + "C:\\Windows\\", # Normalized by os.path.basename(). - '/', - 'ends-with-slash/', + "/", + "ends-with-slash/", ] payload = client.FakePayload() for i, name in enumerate(filenames): - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n' - ])) - payload.write('\r\n--' + client.BOUNDARY + '--\r\n') + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file%s"; filename="%s"' + % (i, name), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + ] + ) + ) + payload.write("\r\n--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/echo/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) self.assertEqual(response.status_code, 200) @@ -301,30 +340,34 @@ class FileUploadTests(TestCase): # Empty filenames should be ignored received = response.json() for i, name in enumerate(filenames): - self.assertIsNone(received.get('file%s' % i)) + self.assertIsNone(received.get("file%s" % i)) def test_non_printable_chars_in_file_names(self): - file_name = 'non-\x00printable\x00\n_chars.txt\x00' + file_name = "non-\x00printable\x00\n_chars.txt\x00" payload = client.FakePayload() - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - f'Content-Disposition: form-data; name="file"; filename="{file_name}"', - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n' - ])) - payload.write('\r\n--' + client.BOUNDARY + '--\r\n') + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + f'Content-Disposition: form-data; name="file"; filename="{file_name}"', + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + ] + ) + ) + payload.write("\r\n--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/echo/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) # Non-printable chars are sanitized. received = response.json() - self.assertEqual(received['file'], 'non-printable_chars.txt') + self.assertEqual(received["file"], "non-printable_chars.txt") def test_dangerous_file_names(self): """Uploaded file names should be sanitized before ever reaching the view.""" @@ -335,21 +378,26 @@ class FileUploadTests(TestCase): # trying such an attack. payload = client.FakePayload() for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES): - payload.write('\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name), - 'Content-Type: application/octet-stream', - '', - 'You got pwnd.\r\n' - ])) - payload.write('\r\n--' + client.BOUNDARY + '--\r\n') + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file%s"; filename="%s"' + % (i, name), + "Content-Type: application/octet-stream", + "", + "You got pwnd.\r\n", + ] + ) + ) + payload.write("\r\n--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': "/echo/", - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) # The filenames should have been sanitized by the time it got to the view. @@ -360,84 +408,99 @@ class FileUploadTests(TestCase): def test_filename_overflow(self): """File names over 256 characters (dangerous on some platforms) get fixed up.""" - long_str = 'f' * 300 + long_str = "f" * 300 cases = [ # field name, filename, expected - ('long_filename', '%s.txt' % long_str, '%s.txt' % long_str[:251]), - ('long_extension', 'foo.%s' % long_str, '.%s' % long_str[:254]), - ('no_extension', long_str, long_str[:255]), - ('no_filename', '.%s' % long_str, '.%s' % long_str[:254]), - ('long_everything', '%s.%s' % (long_str, long_str), '.%s' % long_str[:254]), + ("long_filename", "%s.txt" % long_str, "%s.txt" % long_str[:251]), + ("long_extension", "foo.%s" % long_str, ".%s" % long_str[:254]), + ("no_extension", long_str, long_str[:255]), + ("no_filename", ".%s" % long_str, ".%s" % long_str[:254]), + ("long_everything", "%s.%s" % (long_str, long_str), ".%s" % long_str[:254]), ] payload = client.FakePayload() for name, filename, _ in cases: - payload.write("\r\n".join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="{}"; filename="{}"', - 'Content-Type: application/octet-stream', - '', - 'Oops.', - '' - ]).format(name, filename)) - payload.write('\r\n--' + client.BOUNDARY + '--\r\n') + payload.write( + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="{}"; filename="{}"', + "Content-Type: application/octet-stream", + "", + "Oops.", + "", + ] + ).format(name, filename) + ) + payload.write("\r\n--" + client.BOUNDARY + "--\r\n") r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': "/echo/", - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) result = response.json() for name, _, expected in cases: got = result[name] - self.assertEqual(expected, got, 'Mismatch for {}'.format(name)) - self.assertLess(len(got), 256, - "Got a long file name (%s characters)." % len(got)) + self.assertEqual(expected, got, "Mismatch for {}".format(name)) + self.assertLess( + len(got), 256, "Got a long file name (%s characters)." % len(got) + ) def test_file_content(self): file = tempfile.NamedTemporaryFile - with file(suffix=".ctype_extra") as no_content_type, file(suffix=".ctype_extra") as simple_file: - no_content_type.write(b'no content') + with file(suffix=".ctype_extra") as no_content_type, file( + suffix=".ctype_extra" + ) as simple_file: + no_content_type.write(b"no content") no_content_type.seek(0) - simple_file.write(b'text content') + simple_file.write(b"text content") simple_file.seek(0) - simple_file.content_type = 'text/plain' - - string_io = StringIO('string content') - bytes_io = BytesIO(b'binary content') - - response = self.client.post('/echo_content/', { - 'no_content_type': no_content_type, - 'simple_file': simple_file, - 'string': string_io, - 'binary': bytes_io, - }) + simple_file.content_type = "text/plain" + + string_io = StringIO("string content") + bytes_io = BytesIO(b"binary content") + + response = self.client.post( + "/echo_content/", + { + "no_content_type": no_content_type, + "simple_file": simple_file, + "string": string_io, + "binary": bytes_io, + }, + ) received = response.json() - self.assertEqual(received['no_content_type'], 'no content') - self.assertEqual(received['simple_file'], 'text content') - self.assertEqual(received['string'], 'string content') - self.assertEqual(received['binary'], 'binary content') + self.assertEqual(received["no_content_type"], "no content") + self.assertEqual(received["simple_file"], "text content") + self.assertEqual(received["string"], "string content") + self.assertEqual(received["binary"], "binary content") def test_content_type_extra(self): """Uploaded files may have content type parameters available.""" file = tempfile.NamedTemporaryFile - with file(suffix=".ctype_extra") as no_content_type, file(suffix=".ctype_extra") as simple_file: - no_content_type.write(b'something') + with file(suffix=".ctype_extra") as no_content_type, file( + suffix=".ctype_extra" + ) as simple_file: + no_content_type.write(b"something") no_content_type.seek(0) - simple_file.write(b'something') + simple_file.write(b"something") simple_file.seek(0) - simple_file.content_type = 'text/plain; test-key=test_value' - - response = self.client.post('/echo_content_type_extra/', { - 'no_content_type': no_content_type, - 'simple_file': simple_file, - }) + simple_file.content_type = "text/plain; test-key=test_value" + + response = self.client.post( + "/echo_content_type_extra/", + { + "no_content_type": no_content_type, + "simple_file": simple_file, + }, + ) received = response.json() - self.assertEqual(received['no_content_type'], {}) - self.assertEqual(received['simple_file'], {'test-key': 'test_value'}) + self.assertEqual(received["no_content_type"], {}) + self.assertEqual(received["simple_file"], {"test-key": "test_value"}) def test_truncated_multipart_handled_gracefully(self): """ @@ -445,22 +508,23 @@ class FileUploadTests(TestCase): attempt to read beyond the end of the stream, and simply will handle the part that can be parsed gracefully. """ - payload_str = "\r\n".join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="file"; filename="foo.txt"', - 'Content-Type: application/octet-stream', - '', - 'file contents' - '--' + client.BOUNDARY + '--', - '', - ]) + payload_str = "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="file"; filename="foo.txt"', + "Content-Type: application/octet-stream", + "", + "file contents" "--" + client.BOUNDARY + "--", + "", + ] + ) payload = client.FakePayload(payload_str[:-10]) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/echo/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } self.assertEqual(self.client.request(**r).json(), {}) @@ -470,11 +534,11 @@ class FileUploadTests(TestCase): an empty QueryDict. """ r = { - 'CONTENT_LENGTH': 0, - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/echo/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': client.FakePayload(b''), + "CONTENT_LENGTH": 0, + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/echo/", + "REQUEST_METHOD": "POST", + "wsgi.input": client.FakePayload(b""), } self.assertEqual(self.client.request(**r).json(), {}) @@ -482,34 +546,36 @@ class FileUploadTests(TestCase): file = tempfile.NamedTemporaryFile with file() as smallfile, file() as bigfile: # A small file (under the 5M quota) - smallfile.write(b'a' * (2 ** 21)) + smallfile.write(b"a" * (2**21)) smallfile.seek(0) # A big file (over the quota) - bigfile.write(b'a' * (10 * 2 ** 20)) + bigfile.write(b"a" * (10 * 2**20)) bigfile.seek(0) # Small file posting should work. - self.assertIn('f', self.client.post('/quota/', {'f': smallfile}).json()) + self.assertIn("f", self.client.post("/quota/", {"f": smallfile}).json()) # Large files don't go through. - self.assertNotIn('f', self.client.post("/quota/", {'f': bigfile}).json()) + self.assertNotIn("f", self.client.post("/quota/", {"f": bigfile}).json()) def test_broken_custom_upload_handler(self): with tempfile.NamedTemporaryFile() as file: - file.write(b'a' * (2 ** 21)) + file.write(b"a" * (2**21)) file.seek(0) - msg = 'You cannot alter upload handlers after the upload has been processed.' + msg = ( + "You cannot alter upload handlers after the upload has been processed." + ) with self.assertRaisesMessage(AttributeError, msg): - self.client.post('/quota/broken/', {'f': file}) + self.client.post("/quota/broken/", {"f": file}) def test_stop_upload_temporary_file_handler(self): with tempfile.NamedTemporaryFile() as temp_file: - temp_file.write(b'a') + temp_file.write(b"a") temp_file.seek(0) - response = self.client.post('/temp_file/stop_upload/', {'file': temp_file}) - temp_path = response.json()['temp_path'] + response = self.client.post("/temp_file/stop_upload/", {"file": temp_file}) + temp_path = response.json()["temp_path"] self.assertIs(os.path.exists(temp_path), False) def test_upload_interrupted_temporary_file_handler(self): @@ -523,74 +589,83 @@ class FileUploadTests(TestCase): return with tempfile.NamedTemporaryFile() as temp_file: - temp_file.write(b'a') + temp_file.write(b"a") temp_file.seek(0) with mock.patch( - 'django.http.multipartparser.Parser', + "django.http.multipartparser.Parser", MockedParser, ): response = self.client.post( - '/temp_file/upload_interrupted/', - {'file': temp_file}, + "/temp_file/upload_interrupted/", + {"file": temp_file}, ) - temp_path = response.json()['temp_path'] + temp_path = response.json()["temp_path"] self.assertIs(os.path.exists(temp_path), False) def test_fileupload_getlist(self): file = tempfile.NamedTemporaryFile with file() as file1, file() as file2, file() as file2a: - file1.write(b'a' * (2 ** 23)) + file1.write(b"a" * (2**23)) file1.seek(0) - file2.write(b'a' * (2 * 2 ** 18)) + file2.write(b"a" * (2 * 2**18)) file2.seek(0) - file2a.write(b'a' * (5 * 2 ** 20)) + file2a.write(b"a" * (5 * 2**20)) file2a.seek(0) - response = self.client.post('/getlist_count/', { - 'file1': file1, - 'field1': 'test', - 'field2': 'test3', - 'field3': 'test5', - 'field4': 'test6', - 'field5': 'test7', - 'file2': (file2, file2a) - }) + response = self.client.post( + "/getlist_count/", + { + "file1": file1, + "field1": "test", + "field2": "test3", + "field3": "test5", + "field4": "test6", + "field5": "test7", + "file2": (file2, file2a), + }, + ) got = response.json() - self.assertEqual(got.get('file1'), 1) - self.assertEqual(got.get('file2'), 2) + self.assertEqual(got.get("file1"), 1) + self.assertEqual(got.get("file2"), 2) def test_fileuploads_closed_at_request_end(self): file = tempfile.NamedTemporaryFile with file() as f1, file() as f2a, file() as f2b: - response = self.client.post('/fd_closing/t/', { - 'file': f1, - 'file2': (f2a, f2b), - }) + response = self.client.post( + "/fd_closing/t/", + { + "file": f1, + "file2": (f2a, f2b), + }, + ) request = response.wsgi_request # The files were parsed. - self.assertTrue(hasattr(request, '_files')) + self.assertTrue(hasattr(request, "_files")) - file = request._files['file'] + file = request._files["file"] self.assertTrue(file.closed) - files = request._files.getlist('file2') + files = request._files.getlist("file2") self.assertTrue(files[0].closed) self.assertTrue(files[1].closed) def test_no_parsing_triggered_by_fd_closing(self): file = tempfile.NamedTemporaryFile with file() as f1, file() as f2a, file() as f2b: - response = self.client.post('/fd_closing/f/', { - 'file': f1, - 'file2': (f2a, f2b), - }) + response = self.client.post( + "/fd_closing/f/", + { + "file": f1, + "file2": (f2a, f2b), + }, + ) request = response.wsgi_request # The fd closing logic doesn't trigger parsing of the stream - self.assertFalse(hasattr(request, '_files')) + self.assertFalse(hasattr(request, "_files")) def test_file_error_blocking(self): """ @@ -599,8 +674,10 @@ class FileUploadTests(TestCase): access POST while handling an error in parsing POST. This shouldn't cause an infinite loop! """ + class POSTAccessingHandler(client.ClientHandler): """A handler that'll access POST during an exception.""" + def handle_uncaught_exception(self, request, resolver, exc_info): ret = super().handle_uncaught_exception(request, resolver, exc_info) request.POST # evaluate @@ -611,25 +688,25 @@ class FileUploadTests(TestCase): # this test would fail. So we need to know exactly what kind of error # it raises when there is an attempt to read more than the available bytes: try: - client.FakePayload(b'a').read(2) + client.FakePayload(b"a").read(2) except Exception as err: reference_error = err # install the custom handler that tries to access request.POST self.client.handler = POSTAccessingHandler() - with open(__file__, 'rb') as fp: + with open(__file__, "rb") as fp: post_data = { - 'name': 'Ringo', - 'file_field': fp, + "name": "Ringo", + "file_field": fp, } try: - self.client.post('/upload_errors/', post_data) + self.client.post("/upload_errors/", post_data) except reference_error.__class__ as err: self.assertNotEqual( str(err), str(reference_error), - "Caught a repeated exception that'll cause an infinite loop in file uploads." + "Caught a repeated exception that'll cause an infinite loop in file uploads.", ) except Exception as err: # CustomUploadError is the error that should have been raised @@ -643,64 +720,66 @@ class FileUploadTests(TestCase): # Synthesize the contents of a file upload with a mixed case filename # so we don't have to carry such a file in the Django tests source code # tree. - vars = {'boundary': 'oUrBoUnDaRyStRiNg'} + vars = {"boundary": "oUrBoUnDaRyStRiNg"} post_data = [ - '--%(boundary)s', + "--%(boundary)s", 'Content-Disposition: form-data; name="file_field"; filename="MiXeD_cAsE.txt"', - 'Content-Type: application/octet-stream', - '', - 'file contents\n', - '--%(boundary)s--\r\n', + "Content-Type: application/octet-stream", + "", + "file contents\n", + "--%(boundary)s--\r\n", ] response = self.client.post( - '/filename_case/', - '\r\n'.join(post_data) % vars, - 'multipart/form-data; boundary=%(boundary)s' % vars + "/filename_case/", + "\r\n".join(post_data) % vars, + "multipart/form-data; boundary=%(boundary)s" % vars, ) self.assertEqual(response.status_code, 200) id = int(response.content) obj = FileModel.objects.get(pk=id) # The name of the file uploaded and the file stored in the server-side # shouldn't differ. - self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt') + self.assertEqual(os.path.basename(obj.testfile.path), "MiXeD_cAsE.txt") def test_filename_traversal_upload(self): os.makedirs(UPLOAD_TO, exist_ok=True) tests = [ - '../test.txt', - '../test.txt', + "../test.txt", + "../test.txt", ] for file_name in tests: with self.subTest(file_name=file_name): payload = client.FakePayload() payload.write( - '\r\n'.join([ - '--' + client.BOUNDARY, - 'Content-Disposition: form-data; name="my_file"; ' - 'filename="%s";' % file_name, - 'Content-Type: text/plain', - '', - 'file contents.\r\n', - '\r\n--' + client.BOUNDARY + '--\r\n', - ]), + "\r\n".join( + [ + "--" + client.BOUNDARY, + 'Content-Disposition: form-data; name="my_file"; ' + 'filename="%s";' % file_name, + "Content-Type: text/plain", + "", + "file contents.\r\n", + "\r\n--" + client.BOUNDARY + "--\r\n", + ] + ), ) r = { - 'CONTENT_LENGTH': len(payload), - 'CONTENT_TYPE': client.MULTIPART_CONTENT, - 'PATH_INFO': '/upload_traversal/', - 'REQUEST_METHOD': 'POST', - 'wsgi.input': payload, + "CONTENT_LENGTH": len(payload), + "CONTENT_TYPE": client.MULTIPART_CONTENT, + "PATH_INFO": "/upload_traversal/", + "REQUEST_METHOD": "POST", + "wsgi.input": payload, } response = self.client.request(**r) result = response.json() self.assertEqual(response.status_code, 200) - self.assertEqual(result['file_name'], 'test.txt') + self.assertEqual(result["file_name"], "test.txt") self.assertIs( - os.path.exists(os.path.join(MEDIA_ROOT, 'test.txt')), + os.path.exists(os.path.join(MEDIA_ROOT, "test.txt")), False, ) self.assertIs( - os.path.exists(os.path.join(UPLOAD_TO, 'test.txt')), + os.path.exists(os.path.join(UPLOAD_TO, "test.txt")), True, ) @@ -711,6 +790,7 @@ class DirectoryCreationTests(SimpleTestCase): Tests for error handling during directory creation via _save_FIELD_file (ticket #6450) """ + @classmethod def setUpClass(cls): super().setUpClass() @@ -720,85 +800,128 @@ class DirectoryCreationTests(SimpleTestCase): def setUp(self): self.obj = FileModel() - @unittest.skipIf(sys.platform == 'win32', "Python on Windows doesn't have working os.chmod().") + @unittest.skipIf( + sys.platform == "win32", "Python on Windows doesn't have working os.chmod()." + ) def test_readonly_root(self): """Permission errors are not swallowed""" os.chmod(MEDIA_ROOT, 0o500) self.addCleanup(os.chmod, MEDIA_ROOT, 0o700) with self.assertRaises(PermissionError): - self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'), save=False) + self.obj.testfile.save( + "foo.txt", SimpleUploadedFile("foo.txt", b"x"), save=False + ) def test_not_a_directory(self): # Create a file with the upload directory name - open(UPLOAD_TO, 'wb').close() + open(UPLOAD_TO, "wb").close() self.addCleanup(os.remove, UPLOAD_TO) - msg = '%s exists and is not a directory.' % UPLOAD_TO + msg = "%s exists and is not a directory." % UPLOAD_TO with self.assertRaisesMessage(FileExistsError, msg): - with SimpleUploadedFile('foo.txt', b'x') as file: - self.obj.testfile.save('foo.txt', file, save=False) + with SimpleUploadedFile("foo.txt", b"x") as file: + self.obj.testfile.save("foo.txt", file, save=False) class MultiParserTests(SimpleTestCase): - def test_empty_upload_handlers(self): # We're not actually parsing here; just checking if the parser properly # instantiates with empty upload handlers. - MultiPartParser({ - 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', - 'CONTENT_LENGTH': '1' - }, StringIO('x'), [], 'utf-8') + MultiPartParser( + { + "CONTENT_TYPE": "multipart/form-data; boundary=_foo", + "CONTENT_LENGTH": "1", + }, + StringIO("x"), + [], + "utf-8", + ) def test_invalid_content_type(self): - with self.assertRaisesMessage(MultiPartParserError, 'Invalid Content-Type: text/plain'): - MultiPartParser({ - 'CONTENT_TYPE': 'text/plain', - 'CONTENT_LENGTH': '1', - }, StringIO('x'), [], 'utf-8') + with self.assertRaisesMessage( + MultiPartParserError, "Invalid Content-Type: text/plain" + ): + MultiPartParser( + { + "CONTENT_TYPE": "text/plain", + "CONTENT_LENGTH": "1", + }, + StringIO("x"), + [], + "utf-8", + ) def test_negative_content_length(self): - with self.assertRaisesMessage(MultiPartParserError, 'Invalid content length: -1'): - MultiPartParser({ - 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', - 'CONTENT_LENGTH': -1, - }, StringIO('x'), [], 'utf-8') + with self.assertRaisesMessage( + MultiPartParserError, "Invalid content length: -1" + ): + MultiPartParser( + { + "CONTENT_TYPE": "multipart/form-data; boundary=_foo", + "CONTENT_LENGTH": -1, + }, + StringIO("x"), + [], + "utf-8", + ) def test_bad_type_content_length(self): - multipart_parser = MultiPartParser({ - 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', - 'CONTENT_LENGTH': 'a', - }, StringIO('x'), [], 'utf-8') + multipart_parser = MultiPartParser( + { + "CONTENT_TYPE": "multipart/form-data; boundary=_foo", + "CONTENT_LENGTH": "a", + }, + StringIO("x"), + [], + "utf-8", + ) self.assertEqual(multipart_parser._content_length, 0) def test_sanitize_file_name(self): - parser = MultiPartParser({ - 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', - 'CONTENT_LENGTH': '1' - }, StringIO('x'), [], 'utf-8') + parser = MultiPartParser( + { + "CONTENT_TYPE": "multipart/form-data; boundary=_foo", + "CONTENT_LENGTH": "1", + }, + StringIO("x"), + [], + "utf-8", + ) for file_name in CANDIDATE_TRAVERSAL_FILE_NAMES: with self.subTest(file_name=file_name): - self.assertEqual(parser.sanitize_file_name(file_name), 'hax0rd.txt') + self.assertEqual(parser.sanitize_file_name(file_name), "hax0rd.txt") def test_sanitize_invalid_file_name(self): - parser = MultiPartParser({ - 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo', - 'CONTENT_LENGTH': '1', - }, StringIO('x'), [], 'utf-8') + parser = MultiPartParser( + { + "CONTENT_TYPE": "multipart/form-data; boundary=_foo", + "CONTENT_LENGTH": "1", + }, + StringIO("x"), + [], + "utf-8", + ) for file_name in CANDIDATE_INVALID_FILE_NAMES: with self.subTest(file_name=file_name): self.assertIsNone(parser.sanitize_file_name(file_name)) def test_rfc2231_parsing(self): test_data = ( - (b"Content-Type: application/x-stuff; title*=us-ascii'en-us'This%20is%20%2A%2A%2Afun%2A%2A%2A", - "This is ***fun***"), - (b"Content-Type: application/x-stuff; title*=UTF-8''foo-%c3%a4.html", - "foo-ä.html"), - (b"Content-Type: application/x-stuff; title*=iso-8859-1''foo-%E4.html", - "foo-ä.html"), + ( + b"Content-Type: application/x-stuff; title*=us-ascii'en-us'This%20is%20%2A%2A%2Afun%2A%2A%2A", + "This is ***fun***", + ), + ( + b"Content-Type: application/x-stuff; title*=UTF-8''foo-%c3%a4.html", + "foo-ä.html", + ), + ( + b"Content-Type: application/x-stuff; title*=iso-8859-1''foo-%E4.html", + "foo-ä.html", + ), ) for raw_line, expected_title in test_data: parsed = parse_header(raw_line) - self.assertEqual(parsed[1]['title'], expected_title) + self.assertEqual(parsed[1]["title"], expected_title) def test_rfc2231_wrong_title(self): """ @@ -806,13 +929,13 @@ class MultiParserTests(SimpleTestCase): Parsing should not crash (#24209). """ test_data = ( - (b"Content-Type: application/x-stuff; title*='This%20is%20%2A%2A%2Afun%2A%2A%2A", - b"'This%20is%20%2A%2A%2Afun%2A%2A%2A"), - (b"Content-Type: application/x-stuff; title*='foo.html", - b"'foo.html"), - (b"Content-Type: application/x-stuff; title*=bar.html", - b"bar.html"), + ( + b"Content-Type: application/x-stuff; title*='This%20is%20%2A%2A%2Afun%2A%2A%2A", + b"'This%20is%20%2A%2A%2Afun%2A%2A%2A", + ), + (b"Content-Type: application/x-stuff; title*='foo.html", b"'foo.html"), + (b"Content-Type: application/x-stuff; title*=bar.html", b"bar.html"), ) for raw_line, expected_title in test_data: parsed = parse_header(raw_line) - self.assertEqual(parsed[1]['title'], expected_title) + self.assertEqual(parsed[1]["title"], expected_title) diff --git a/tests/file_uploads/uploadhandler.py b/tests/file_uploads/uploadhandler.py index eac6de037c..eecbc6dc9b 100644 --- a/tests/file_uploads/uploadhandler.py +++ b/tests/file_uploads/uploadhandler.py @@ -5,7 +5,9 @@ import os from tempfile import NamedTemporaryFile from django.core.files.uploadhandler import ( - FileUploadHandler, StopUpload, TemporaryFileUploadHandler, + FileUploadHandler, + StopUpload, + TemporaryFileUploadHandler, ) @@ -15,7 +17,7 @@ class QuotaUploadHandler(FileUploadHandler): (5MB) is uploaded. """ - QUOTA = 5 * 2 ** 20 # 5 MB + QUOTA = 5 * 2**20 # 5 MB def __init__(self, request=None): super().__init__(request) @@ -33,6 +35,7 @@ class QuotaUploadHandler(FileUploadHandler): class StopUploadTemporaryFileHandler(TemporaryFileUploadHandler): """A handler that raises a StopUpload exception.""" + def receive_data_chunk(self, raw_data, start): raise StopUpload() @@ -43,12 +46,14 @@ class CustomUploadError(Exception): class ErroringUploadHandler(FileUploadHandler): """A handler that raises an exception.""" + def receive_data_chunk(self, raw_data, start): raise CustomUploadError("Oops!") class TraversalUploadHandler(FileUploadHandler): """A handler with potential directory-traversal vulnerability.""" + def __init__(self, request=None): from .views import UPLOAD_TO @@ -58,19 +63,28 @@ class TraversalUploadHandler(FileUploadHandler): def file_complete(self, file_size): self.file.seek(0) self.file.size = file_size - with open(os.path.join(self.upload_dir, self.file_name), 'wb') as fp: + with open(os.path.join(self.upload_dir, self.file_name), "wb") as fp: fp.write(self.file.read()) return self.file def new_file( - self, field_name, file_name, content_type, content_length, charset=None, + self, + field_name, + file_name, + content_type, + content_length, + charset=None, content_type_extra=None, ): super().new_file( - file_name, file_name, content_length, content_length, charset, + file_name, + file_name, + content_length, + content_length, + charset, content_type_extra, ) - self.file = NamedTemporaryFile(suffix='.upload', dir=self.upload_dir) + self.file = NamedTemporaryFile(suffix=".upload", dir=self.upload_dir) def receive_data_chunk(self, raw_data, start): self.file.write(raw_data) diff --git a/tests/file_uploads/urls.py b/tests/file_uploads/urls.py index 9df5432403..7cb54f3be0 100644 --- a/tests/file_uploads/urls.py +++ b/tests/file_uploads/urls.py @@ -3,19 +3,19 @@ from django.urls import path, re_path from . import views urlpatterns = [ - path('upload/', views.file_upload_view), - path('upload_traversal/', views.file_upload_traversal_view), - path('verify/', views.file_upload_view_verify), - path('unicode_name/', views.file_upload_unicode_name), - path('echo/', views.file_upload_echo), - path('echo_content_type_extra/', views.file_upload_content_type_extra), - path('echo_content/', views.file_upload_echo_content), - path('quota/', views.file_upload_quota), - path('quota/broken/', views.file_upload_quota_broken), - path('getlist_count/', views.file_upload_getlist_count), - path('upload_errors/', views.file_upload_errors), - path('temp_file/stop_upload/', views.file_stop_upload_temporary_file), - path('temp_file/upload_interrupted/', views.file_upload_interrupted_temporary_file), - path('filename_case/', views.file_upload_filename_case_view), - re_path(r'^fd_closing/(?P<access>t|f)/$', views.file_upload_fd_closing), + path("upload/", views.file_upload_view), + path("upload_traversal/", views.file_upload_traversal_view), + path("verify/", views.file_upload_view_verify), + path("unicode_name/", views.file_upload_unicode_name), + path("echo/", views.file_upload_echo), + path("echo_content_type_extra/", views.file_upload_content_type_extra), + path("echo_content/", views.file_upload_echo_content), + path("quota/", views.file_upload_quota), + path("quota/broken/", views.file_upload_quota_broken), + path("getlist_count/", views.file_upload_getlist_count), + path("upload_errors/", views.file_upload_errors), + path("temp_file/stop_upload/", views.file_stop_upload_temporary_file), + path("temp_file/upload_interrupted/", views.file_upload_interrupted_temporary_file), + path("filename_case/", views.file_upload_filename_case_view), + re_path(r"^fd_closing/(?P<access>t|f)/$", views.file_upload_fd_closing), ] diff --git a/tests/file_uploads/views.py b/tests/file_uploads/views.py index 50de6238b4..d5efbba3ce 100644 --- a/tests/file_uploads/views.py +++ b/tests/file_uploads/views.py @@ -8,7 +8,9 @@ from django.http import HttpResponse, HttpResponseServerError, JsonResponse from .models import FileModel from .tests import UNICODE_FILENAME, UPLOAD_TO from .uploadhandler import ( - ErroringUploadHandler, QuotaUploadHandler, StopUploadTemporaryFileHandler, + ErroringUploadHandler, + QuotaUploadHandler, + StopUploadTemporaryFileHandler, TraversalUploadHandler, ) @@ -19,10 +21,12 @@ def file_upload_view(request): """ form_data = request.POST.copy() form_data.update(request.FILES) - if isinstance(form_data.get('file_field'), UploadedFile) and isinstance(form_data['name'], str): + if isinstance(form_data.get("file_field"), UploadedFile) and isinstance( + form_data["name"], str + ): # If a file is posted, the dummy client should only post the file name, # not the full path. - if os.path.dirname(form_data['file_field'].name) != '': + if os.path.dirname(form_data["file_field"].name) != "": return HttpResponseServerError() return HttpResponse() else: @@ -37,11 +41,11 @@ def file_upload_view_verify(request): form_data.update(request.FILES) for key, value in form_data.items(): - if key.endswith('_hash'): + if key.endswith("_hash"): continue - if key + '_hash' not in form_data: + if key + "_hash" not in form_data: continue - submitted_hash = form_data[key + '_hash'] + submitted_hash = form_data[key + "_hash"] if isinstance(value, UploadedFile): new_hash = hashlib.sha1(value.read()).hexdigest() else: @@ -50,7 +54,7 @@ def file_upload_view_verify(request): return HttpResponseServerError() # Adding large file to the database should succeed - largefile = request.FILES['file_field2'] + largefile = request.FILES["file_field2"] obj = FileModel() obj.testfile.save(largefile.name, largefile) @@ -59,13 +63,13 @@ def file_upload_view_verify(request): def file_upload_unicode_name(request): # Check to see if Unicode name came through properly. - if not request.FILES['file_unicode'].name.endswith(UNICODE_FILENAME): + if not request.FILES["file_unicode"].name.endswith(UNICODE_FILENAME): return HttpResponseServerError() # Check to make sure the exotic characters are preserved even # through file save. - uni_named_file = request.FILES['file_unicode'] + uni_named_file = request.FILES["file_unicode"] FileModel.objects.create(testfile=uni_named_file) - full_name = '%s/%s' % (UPLOAD_TO, uni_named_file.name) + full_name = "%s/%s" % (UPLOAD_TO, uni_named_file.name) return HttpResponse() if os.path.exists(full_name) else HttpResponseServerError() @@ -81,9 +85,11 @@ def file_upload_echo_content(request): """ Simple view to echo back the content of uploaded files for tests. """ + def read_and_close(f): with f: return f.read().decode() + r = {k: read_and_close(f) for k, f in request.FILES.items()} return JsonResponse(r) @@ -110,7 +116,7 @@ def file_stop_upload_temporary_file(request): request.upload_handlers.pop(2) request.FILES # Trigger file parsing. return JsonResponse( - {'temp_path': request.upload_handlers[0].file.temporary_file_path()}, + {"temp_path": request.upload_handlers[0].file.temporary_file_path()}, ) @@ -119,7 +125,7 @@ def file_upload_interrupted_temporary_file(request): request.upload_handlers.pop(2) request.FILES # Trigger file parsing. return JsonResponse( - {'temp_path': request.upload_handlers[0].file.temporary_file_path()}, + {"temp_path": request.upload_handlers[0].file.temporary_file_path()}, ) @@ -143,10 +149,10 @@ def file_upload_filename_case_view(request): """ Check adding the file to the database will preserve the filename case. """ - file = request.FILES['file_field'] + file = request.FILES["file_field"] obj = FileModel() obj.testfile.save(file.name, file) - return HttpResponse('%d' % obj.pk) + return HttpResponse("%d" % obj.pk) def file_upload_content_type_extra(request): @@ -155,12 +161,14 @@ def file_upload_content_type_extra(request): """ params = {} for file_name, uploadedfile in request.FILES.items(): - params[file_name] = {k: v.decode() for k, v in uploadedfile.content_type_extra.items()} + params[file_name] = { + k: v.decode() for k, v in uploadedfile.content_type_extra.items() + } return JsonResponse(params) def file_upload_fd_closing(request, access): - if access == 't': + if access == "t": request.FILES # Trigger file parsing. return HttpResponse() @@ -169,5 +177,5 @@ def file_upload_traversal_view(request): request.upload_handlers.insert(0, TraversalUploadHandler()) request.FILES # Trigger file parsing. return JsonResponse( - {'file_name': request.upload_handlers[0].file_name}, + {"file_name": request.upload_handlers[0].file_name}, ) |