summaryrefslogtreecommitdiff
path: root/tests/file_uploads
diff options
context:
space:
mode:
authordjango-bot <ops@djangoproject.com>2022-02-03 20:24:19 +0100
committerMariusz Felisiak <felisiak.mariusz@gmail.com>2022-02-07 20:37:05 +0100
commit9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch)
treef0506b668a013d0063e5fba3dbf4863b466713ba /tests/file_uploads
parentf68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff)
downloaddjango-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/file_uploads')
-rw-r--r--tests/file_uploads/models.py2
-rw-r--r--tests/file_uploads/tests.py873
-rw-r--r--tests/file_uploads/uploadhandler.py26
-rw-r--r--tests/file_uploads/urls.py30
-rw-r--r--tests/file_uploads/views.py42
5 files changed, 559 insertions, 414 deletions
diff --git a/tests/file_uploads/models.py b/tests/file_uploads/models.py
index 3336cc9d90..7ec1900f50 100644
--- a/tests/file_uploads/models.py
+++ b/tests/file_uploads/models.py
@@ -2,4 +2,4 @@ from django.db import models
class FileModel(models.Model):
- testfile = models.FileField(upload_to='test_upload')
+ testfile = models.FileField(upload_to="test_upload")
diff --git a/tests/file_uploads/tests.py b/tests/file_uploads/tests.py
index 78898e2b1e..37f9559eef 100644
--- a/tests/file_uploads/tests.py
+++ b/tests/file_uploads/tests.py
@@ -13,47 +13,52 @@ from django.core.exceptions import SuspiciousFileOperation
from django.core.files import temp as tempfile
from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
from django.http.multipartparser import (
- FILE, MultiPartParser, MultiPartParserError, Parser, parse_header,
+ FILE,
+ MultiPartParser,
+ MultiPartParserError,
+ Parser,
+ parse_header,
)
from django.test import SimpleTestCase, TestCase, client, override_settings
from . import uploadhandler
from .models import FileModel
-UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg'
+UNICODE_FILENAME = "test-0123456789_中文_Orléans.jpg"
MEDIA_ROOT = sys_tempfile.mkdtemp()
-UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
+UPLOAD_TO = os.path.join(MEDIA_ROOT, "test_upload")
CANDIDATE_TRAVERSAL_FILE_NAMES = [
- '/tmp/hax0rd.txt', # Absolute path, *nix-style.
- 'C:\\Windows\\hax0rd.txt', # Absolute path, win-style.
- 'C:/Windows/hax0rd.txt', # Absolute path, broken-style.
- '\\tmp\\hax0rd.txt', # Absolute path, broken in a different way.
- '/tmp\\hax0rd.txt', # Absolute path, broken by mixing.
- 'subdir/hax0rd.txt', # Descendant path, *nix-style.
- 'subdir\\hax0rd.txt', # Descendant path, win-style.
- 'sub/dir\\hax0rd.txt', # Descendant path, mixed.
- '../../hax0rd.txt', # Relative path, *nix-style.
- '..\\..\\hax0rd.txt', # Relative path, win-style.
- '../..\\hax0rd.txt', # Relative path, mixed.
- '..&#x2F;hax0rd.txt', # HTML entities.
- '..&sol;hax0rd.txt', # HTML entities.
+ "/tmp/hax0rd.txt", # Absolute path, *nix-style.
+ "C:\\Windows\\hax0rd.txt", # Absolute path, win-style.
+ "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
+ "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
+ "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
+ "subdir/hax0rd.txt", # Descendant path, *nix-style.
+ "subdir\\hax0rd.txt", # Descendant path, win-style.
+ "sub/dir\\hax0rd.txt", # Descendant path, mixed.
+ "../../hax0rd.txt", # Relative path, *nix-style.
+ "..\\..\\hax0rd.txt", # Relative path, win-style.
+ "../..\\hax0rd.txt", # Relative path, mixed.
+ "..&#x2F;hax0rd.txt", # HTML entities.
+ "..&sol;hax0rd.txt", # HTML entities.
]
CANDIDATE_INVALID_FILE_NAMES = [
- '/tmp/', # Directory, *nix-style.
- 'c:\\tmp\\', # Directory, win-style.
- '/tmp/.', # Directory dot, *nix-style.
- 'c:\\tmp\\.', # Directory dot, *nix-style.
- '/tmp/..', # Parent directory, *nix-style.
- 'c:\\tmp\\..', # Parent directory, win-style.
- '', # Empty filename.
+ "/tmp/", # Directory, *nix-style.
+ "c:\\tmp\\", # Directory, win-style.
+ "/tmp/.", # Directory dot, *nix-style.
+ "c:\\tmp\\.", # Directory dot, *nix-style.
+ "/tmp/..", # Parent directory, *nix-style.
+ "c:\\tmp\\..", # Parent directory, win-style.
+ "", # Empty filename.
]
-@override_settings(MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF='file_uploads.urls', MIDDLEWARE=[])
+@override_settings(
+ MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF="file_uploads.urls", MIDDLEWARE=[]
+)
class FileUploadTests(TestCase):
-
@classmethod
def setUpClass(cls):
super().setUpClass()
@@ -62,73 +67,84 @@ class FileUploadTests(TestCase):
def test_upload_name_is_validated(self):
candidates = [
- '/tmp/',
- '/tmp/..',
- '/tmp/.',
+ "/tmp/",
+ "/tmp/..",
+ "/tmp/.",
]
- if sys.platform == 'win32':
- candidates.extend([
- 'c:\\tmp\\',
- 'c:\\tmp\\..',
- 'c:\\tmp\\.',
- ])
+ if sys.platform == "win32":
+ candidates.extend(
+ [
+ "c:\\tmp\\",
+ "c:\\tmp\\..",
+ "c:\\tmp\\.",
+ ]
+ )
for file_name in candidates:
with self.subTest(file_name=file_name):
self.assertRaises(SuspiciousFileOperation, UploadedFile, name=file_name)
def test_simple_upload(self):
- with open(__file__, 'rb') as fp:
+ with open(__file__, "rb") as fp:
post_data = {
- 'name': 'Ringo',
- 'file_field': fp,
+ "name": "Ringo",
+ "file_field": fp,
}
- response = self.client.post('/upload/', post_data)
+ response = self.client.post("/upload/", post_data)
self.assertEqual(response.status_code, 200)
def test_large_upload(self):
file = tempfile.NamedTemporaryFile
with file(suffix=".file1") as file1, file(suffix=".file2") as file2:
- file1.write(b'a' * (2 ** 21))
+ file1.write(b"a" * (2**21))
file1.seek(0)
- file2.write(b'a' * (10 * 2 ** 20))
+ file2.write(b"a" * (10 * 2**20))
file2.seek(0)
post_data = {
- 'name': 'Ringo',
- 'file_field1': file1,
- 'file_field2': file2,
+ "name": "Ringo",
+ "file_field1": file1,
+ "file_field2": file2,
}
for key in list(post_data):
try:
- post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
+ post_data[key + "_hash"] = hashlib.sha1(
+ post_data[key].read()
+ ).hexdigest()
post_data[key].seek(0)
except AttributeError:
- post_data[key + '_hash'] = hashlib.sha1(post_data[key].encode()).hexdigest()
+ post_data[key + "_hash"] = hashlib.sha1(
+ post_data[key].encode()
+ ).hexdigest()
- response = self.client.post('/verify/', post_data)
+ response = self.client.post("/verify/", post_data)
self.assertEqual(response.status_code, 200)
def _test_base64_upload(self, content, encode=base64.b64encode):
- payload = client.FakePayload("\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="test.txt"',
- 'Content-Type: application/octet-stream',
- 'Content-Transfer-Encoding: base64',
- '']))
- payload.write(b'\r\n' + encode(content.encode()) + b'\r\n')
- payload.write('--' + client.BOUNDARY + '--\r\n')
+ payload = client.FakePayload(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="test.txt"',
+ "Content-Type: application/octet-stream",
+ "Content-Transfer-Encoding: base64",
+ "",
+ ]
+ )
+ )
+ payload.write(b"\r\n" + encode(content.encode()) + b"\r\n")
+ payload.write("--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/echo_content/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo_content/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
- self.assertEqual(response.json()['file'], content)
+ self.assertEqual(response.json()["file"], content)
def test_base64_upload(self):
self._test_base64_upload("This data will be transmitted base64-encoded.")
@@ -140,32 +156,36 @@ class FileUploadTests(TestCase):
self._test_base64_upload("Big data" * 68000, encode=base64.encodebytes)
def test_base64_invalid_upload(self):
- payload = client.FakePayload('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="test.txt"',
- 'Content-Type: application/octet-stream',
- 'Content-Transfer-Encoding: base64',
- ''
- ]))
- payload.write(b'\r\n!\r\n')
- payload.write('--' + client.BOUNDARY + '--\r\n')
+ payload = client.FakePayload(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="test.txt"',
+ "Content-Type: application/octet-stream",
+ "Content-Transfer-Encoding: base64",
+ "",
+ ]
+ )
+ )
+ payload.write(b"\r\n!\r\n")
+ payload.write("--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/echo_content/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo_content/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
- self.assertEqual(response.json()['file'], '')
+ self.assertEqual(response.json()["file"], "")
def test_unicode_file_name(self):
with sys_tempfile.TemporaryDirectory() as temp_dir:
# This file contains Chinese symbols and an accented char in the name.
- with open(os.path.join(temp_dir, UNICODE_FILENAME), 'w+b') as file1:
- file1.write(b'b' * (2 ** 10))
+ with open(os.path.join(temp_dir, UNICODE_FILENAME), "w+b") as file1:
+ file1.write(b"b" * (2**10))
file1.seek(0)
- response = self.client.post('/unicode_name/', {'file_unicode': file1})
+ response = self.client.post("/unicode_name/", {"file_unicode": file1})
self.assertEqual(response.status_code, 200)
def test_unicode_file_name_rfc2231(self):
@@ -174,21 +194,26 @@ class FileUploadTests(TestCase):
(#22971).
"""
payload = client.FakePayload()
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file_unicode"; filename*=UTF-8\'\'%s' % quote(UNICODE_FILENAME),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n',
- '\r\n--' + client.BOUNDARY + '--\r\n'
- ]))
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ "Content-Disposition: form-data; name=\"file_unicode\"; filename*=UTF-8''%s"
+ % quote(UNICODE_FILENAME),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ "\r\n--" + client.BOUNDARY + "--\r\n",
+ ]
+ )
+ )
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/unicode_name/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/unicode_name/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
self.assertEqual(response.status_code, 200)
@@ -200,66 +225,75 @@ class FileUploadTests(TestCase):
"""
payload = client.FakePayload()
payload.write(
- '\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name*=UTF-8\'\'file_unicode; filename*=UTF-8\'\'%s' % quote(
- UNICODE_FILENAME
- ),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n',
- '\r\n--' + client.BOUNDARY + '--\r\n'
- ])
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ "Content-Disposition: form-data; name*=UTF-8''file_unicode; filename*=UTF-8''%s"
+ % quote(UNICODE_FILENAME),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ "\r\n--" + client.BOUNDARY + "--\r\n",
+ ]
+ )
)
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/unicode_name/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/unicode_name/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
self.assertEqual(response.status_code, 200)
def test_unicode_file_name_rfc2231_with_double_quotes(self):
payload = client.FakePayload()
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file_unicode"; '
- 'filename*="UTF-8\'\'%s"' % quote(UNICODE_FILENAME),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n',
- '\r\n--' + client.BOUNDARY + '--\r\n',
- ]))
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file_unicode"; '
+ "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ "\r\n--" + client.BOUNDARY + "--\r\n",
+ ]
+ )
+ )
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/unicode_name/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/unicode_name/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
self.assertEqual(response.status_code, 200)
def test_unicode_name_rfc2231_with_double_quotes(self):
payload = client.FakePayload()
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name*="UTF-8\'\'file_unicode"; '
- 'filename*="UTF-8\'\'%s"' % quote(UNICODE_FILENAME),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n',
- '\r\n--' + client.BOUNDARY + '--\r\n'
- ]))
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ "Content-Disposition: form-data; name*=\"UTF-8''file_unicode\"; "
+ "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ "\r\n--" + client.BOUNDARY + "--\r\n",
+ ]
+ )
+ )
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/unicode_name/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/unicode_name/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
self.assertEqual(response.status_code, 200)
@@ -270,30 +304,35 @@ class FileUploadTests(TestCase):
sanitization) should be okay.
"""
filenames = [
- '',
+ "",
# Normalized by MultiPartParser.IE_sanitize().
- 'C:\\Windows\\',
+ "C:\\Windows\\",
# Normalized by os.path.basename().
- '/',
- 'ends-with-slash/',
+ "/",
+ "ends-with-slash/",
]
payload = client.FakePayload()
for i, name in enumerate(filenames):
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n'
- ]))
- payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file%s"; filename="%s"'
+ % (i, name),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ ]
+ )
+ )
+ payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
self.assertEqual(response.status_code, 200)
@@ -301,30 +340,34 @@ class FileUploadTests(TestCase):
# Empty filenames should be ignored
received = response.json()
for i, name in enumerate(filenames):
- self.assertIsNone(received.get('file%s' % i))
+ self.assertIsNone(received.get("file%s" % i))
def test_non_printable_chars_in_file_names(self):
- file_name = 'non-\x00printable\x00\n_chars.txt\x00'
+ file_name = "non-\x00printable\x00\n_chars.txt\x00"
payload = client.FakePayload()
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- f'Content-Disposition: form-data; name="file"; filename="{file_name}"',
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n'
- ]))
- payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ f'Content-Disposition: form-data; name="file"; filename="{file_name}"',
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ ]
+ )
+ )
+ payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
# Non-printable chars are sanitized.
received = response.json()
- self.assertEqual(received['file'], 'non-printable_chars.txt')
+ self.assertEqual(received["file"], "non-printable_chars.txt")
def test_dangerous_file_names(self):
"""Uploaded file names should be sanitized before ever reaching the view."""
@@ -335,21 +378,26 @@ class FileUploadTests(TestCase):
# trying such an attack.
payload = client.FakePayload()
for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES):
- payload.write('\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.\r\n'
- ]))
- payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file%s"; filename="%s"'
+ % (i, name),
+ "Content-Type: application/octet-stream",
+ "",
+ "You got pwnd.\r\n",
+ ]
+ )
+ )
+ payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/echo/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
# The filenames should have been sanitized by the time it got to the view.
@@ -360,84 +408,99 @@ class FileUploadTests(TestCase):
def test_filename_overflow(self):
"""File names over 256 characters (dangerous on some platforms) get fixed up."""
- long_str = 'f' * 300
+ long_str = "f" * 300
cases = [
# field name, filename, expected
- ('long_filename', '%s.txt' % long_str, '%s.txt' % long_str[:251]),
- ('long_extension', 'foo.%s' % long_str, '.%s' % long_str[:254]),
- ('no_extension', long_str, long_str[:255]),
- ('no_filename', '.%s' % long_str, '.%s' % long_str[:254]),
- ('long_everything', '%s.%s' % (long_str, long_str), '.%s' % long_str[:254]),
+ ("long_filename", "%s.txt" % long_str, "%s.txt" % long_str[:251]),
+ ("long_extension", "foo.%s" % long_str, ".%s" % long_str[:254]),
+ ("no_extension", long_str, long_str[:255]),
+ ("no_filename", ".%s" % long_str, ".%s" % long_str[:254]),
+ ("long_everything", "%s.%s" % (long_str, long_str), ".%s" % long_str[:254]),
]
payload = client.FakePayload()
for name, filename, _ in cases:
- payload.write("\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="{}"; filename="{}"',
- 'Content-Type: application/octet-stream',
- '',
- 'Oops.',
- ''
- ]).format(name, filename))
- payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
+ payload.write(
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="{}"; filename="{}"',
+ "Content-Type: application/octet-stream",
+ "",
+ "Oops.",
+ "",
+ ]
+ ).format(name, filename)
+ )
+ payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/echo/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
result = response.json()
for name, _, expected in cases:
got = result[name]
- self.assertEqual(expected, got, 'Mismatch for {}'.format(name))
- self.assertLess(len(got), 256,
- "Got a long file name (%s characters)." % len(got))
+ self.assertEqual(expected, got, "Mismatch for {}".format(name))
+ self.assertLess(
+ len(got), 256, "Got a long file name (%s characters)." % len(got)
+ )
def test_file_content(self):
file = tempfile.NamedTemporaryFile
- with file(suffix=".ctype_extra") as no_content_type, file(suffix=".ctype_extra") as simple_file:
- no_content_type.write(b'no content')
+ with file(suffix=".ctype_extra") as no_content_type, file(
+ suffix=".ctype_extra"
+ ) as simple_file:
+ no_content_type.write(b"no content")
no_content_type.seek(0)
- simple_file.write(b'text content')
+ simple_file.write(b"text content")
simple_file.seek(0)
- simple_file.content_type = 'text/plain'
-
- string_io = StringIO('string content')
- bytes_io = BytesIO(b'binary content')
-
- response = self.client.post('/echo_content/', {
- 'no_content_type': no_content_type,
- 'simple_file': simple_file,
- 'string': string_io,
- 'binary': bytes_io,
- })
+ simple_file.content_type = "text/plain"
+
+ string_io = StringIO("string content")
+ bytes_io = BytesIO(b"binary content")
+
+ response = self.client.post(
+ "/echo_content/",
+ {
+ "no_content_type": no_content_type,
+ "simple_file": simple_file,
+ "string": string_io,
+ "binary": bytes_io,
+ },
+ )
received = response.json()
- self.assertEqual(received['no_content_type'], 'no content')
- self.assertEqual(received['simple_file'], 'text content')
- self.assertEqual(received['string'], 'string content')
- self.assertEqual(received['binary'], 'binary content')
+ self.assertEqual(received["no_content_type"], "no content")
+ self.assertEqual(received["simple_file"], "text content")
+ self.assertEqual(received["string"], "string content")
+ self.assertEqual(received["binary"], "binary content")
def test_content_type_extra(self):
"""Uploaded files may have content type parameters available."""
file = tempfile.NamedTemporaryFile
- with file(suffix=".ctype_extra") as no_content_type, file(suffix=".ctype_extra") as simple_file:
- no_content_type.write(b'something')
+ with file(suffix=".ctype_extra") as no_content_type, file(
+ suffix=".ctype_extra"
+ ) as simple_file:
+ no_content_type.write(b"something")
no_content_type.seek(0)
- simple_file.write(b'something')
+ simple_file.write(b"something")
simple_file.seek(0)
- simple_file.content_type = 'text/plain; test-key=test_value'
-
- response = self.client.post('/echo_content_type_extra/', {
- 'no_content_type': no_content_type,
- 'simple_file': simple_file,
- })
+ simple_file.content_type = "text/plain; test-key=test_value"
+
+ response = self.client.post(
+ "/echo_content_type_extra/",
+ {
+ "no_content_type": no_content_type,
+ "simple_file": simple_file,
+ },
+ )
received = response.json()
- self.assertEqual(received['no_content_type'], {})
- self.assertEqual(received['simple_file'], {'test-key': 'test_value'})
+ self.assertEqual(received["no_content_type"], {})
+ self.assertEqual(received["simple_file"], {"test-key": "test_value"})
def test_truncated_multipart_handled_gracefully(self):
"""
@@ -445,22 +508,23 @@ class FileUploadTests(TestCase):
attempt to read beyond the end of the stream, and simply will handle
the part that can be parsed gracefully.
"""
- payload_str = "\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
- 'Content-Type: application/octet-stream',
- '',
- 'file contents'
- '--' + client.BOUNDARY + '--',
- '',
- ])
+ payload_str = "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
+ "Content-Type: application/octet-stream",
+ "",
+ "file contents" "--" + client.BOUNDARY + "--",
+ "",
+ ]
+ )
payload = client.FakePayload(payload_str[:-10])
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
self.assertEqual(self.client.request(**r).json(), {})
@@ -470,11 +534,11 @@ class FileUploadTests(TestCase):
an empty QueryDict.
"""
r = {
- 'CONTENT_LENGTH': 0,
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(b''),
+ "CONTENT_LENGTH": 0,
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/echo/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": client.FakePayload(b""),
}
self.assertEqual(self.client.request(**r).json(), {})
@@ -482,34 +546,36 @@ class FileUploadTests(TestCase):
file = tempfile.NamedTemporaryFile
with file() as smallfile, file() as bigfile:
# A small file (under the 5M quota)
- smallfile.write(b'a' * (2 ** 21))
+ smallfile.write(b"a" * (2**21))
smallfile.seek(0)
# A big file (over the quota)
- bigfile.write(b'a' * (10 * 2 ** 20))
+ bigfile.write(b"a" * (10 * 2**20))
bigfile.seek(0)
# Small file posting should work.
- self.assertIn('f', self.client.post('/quota/', {'f': smallfile}).json())
+ self.assertIn("f", self.client.post("/quota/", {"f": smallfile}).json())
# Large files don't go through.
- self.assertNotIn('f', self.client.post("/quota/", {'f': bigfile}).json())
+ self.assertNotIn("f", self.client.post("/quota/", {"f": bigfile}).json())
def test_broken_custom_upload_handler(self):
with tempfile.NamedTemporaryFile() as file:
- file.write(b'a' * (2 ** 21))
+ file.write(b"a" * (2**21))
file.seek(0)
- msg = 'You cannot alter upload handlers after the upload has been processed.'
+ msg = (
+ "You cannot alter upload handlers after the upload has been processed."
+ )
with self.assertRaisesMessage(AttributeError, msg):
- self.client.post('/quota/broken/', {'f': file})
+ self.client.post("/quota/broken/", {"f": file})
def test_stop_upload_temporary_file_handler(self):
with tempfile.NamedTemporaryFile() as temp_file:
- temp_file.write(b'a')
+ temp_file.write(b"a")
temp_file.seek(0)
- response = self.client.post('/temp_file/stop_upload/', {'file': temp_file})
- temp_path = response.json()['temp_path']
+ response = self.client.post("/temp_file/stop_upload/", {"file": temp_file})
+ temp_path = response.json()["temp_path"]
self.assertIs(os.path.exists(temp_path), False)
def test_upload_interrupted_temporary_file_handler(self):
@@ -523,74 +589,83 @@ class FileUploadTests(TestCase):
return
with tempfile.NamedTemporaryFile() as temp_file:
- temp_file.write(b'a')
+ temp_file.write(b"a")
temp_file.seek(0)
with mock.patch(
- 'django.http.multipartparser.Parser',
+ "django.http.multipartparser.Parser",
MockedParser,
):
response = self.client.post(
- '/temp_file/upload_interrupted/',
- {'file': temp_file},
+ "/temp_file/upload_interrupted/",
+ {"file": temp_file},
)
- temp_path = response.json()['temp_path']
+ temp_path = response.json()["temp_path"]
self.assertIs(os.path.exists(temp_path), False)
def test_fileupload_getlist(self):
file = tempfile.NamedTemporaryFile
with file() as file1, file() as file2, file() as file2a:
- file1.write(b'a' * (2 ** 23))
+ file1.write(b"a" * (2**23))
file1.seek(0)
- file2.write(b'a' * (2 * 2 ** 18))
+ file2.write(b"a" * (2 * 2**18))
file2.seek(0)
- file2a.write(b'a' * (5 * 2 ** 20))
+ file2a.write(b"a" * (5 * 2**20))
file2a.seek(0)
- response = self.client.post('/getlist_count/', {
- 'file1': file1,
- 'field1': 'test',
- 'field2': 'test3',
- 'field3': 'test5',
- 'field4': 'test6',
- 'field5': 'test7',
- 'file2': (file2, file2a)
- })
+ response = self.client.post(
+ "/getlist_count/",
+ {
+ "file1": file1,
+ "field1": "test",
+ "field2": "test3",
+ "field3": "test5",
+ "field4": "test6",
+ "field5": "test7",
+ "file2": (file2, file2a),
+ },
+ )
got = response.json()
- self.assertEqual(got.get('file1'), 1)
- self.assertEqual(got.get('file2'), 2)
+ self.assertEqual(got.get("file1"), 1)
+ self.assertEqual(got.get("file2"), 2)
def test_fileuploads_closed_at_request_end(self):
file = tempfile.NamedTemporaryFile
with file() as f1, file() as f2a, file() as f2b:
- response = self.client.post('/fd_closing/t/', {
- 'file': f1,
- 'file2': (f2a, f2b),
- })
+ response = self.client.post(
+ "/fd_closing/t/",
+ {
+ "file": f1,
+ "file2": (f2a, f2b),
+ },
+ )
request = response.wsgi_request
# The files were parsed.
- self.assertTrue(hasattr(request, '_files'))
+ self.assertTrue(hasattr(request, "_files"))
- file = request._files['file']
+ file = request._files["file"]
self.assertTrue(file.closed)
- files = request._files.getlist('file2')
+ files = request._files.getlist("file2")
self.assertTrue(files[0].closed)
self.assertTrue(files[1].closed)
def test_no_parsing_triggered_by_fd_closing(self):
file = tempfile.NamedTemporaryFile
with file() as f1, file() as f2a, file() as f2b:
- response = self.client.post('/fd_closing/f/', {
- 'file': f1,
- 'file2': (f2a, f2b),
- })
+ response = self.client.post(
+ "/fd_closing/f/",
+ {
+ "file": f1,
+ "file2": (f2a, f2b),
+ },
+ )
request = response.wsgi_request
# The fd closing logic doesn't trigger parsing of the stream
- self.assertFalse(hasattr(request, '_files'))
+ self.assertFalse(hasattr(request, "_files"))
def test_file_error_blocking(self):
"""
@@ -599,8 +674,10 @@ class FileUploadTests(TestCase):
access POST while handling an error in parsing POST. This shouldn't
cause an infinite loop!
"""
+
class POSTAccessingHandler(client.ClientHandler):
"""A handler that'll access POST during an exception."""
+
def handle_uncaught_exception(self, request, resolver, exc_info):
ret = super().handle_uncaught_exception(request, resolver, exc_info)
request.POST # evaluate
@@ -611,25 +688,25 @@ class FileUploadTests(TestCase):
# this test would fail. So we need to know exactly what kind of error
# it raises when there is an attempt to read more than the available bytes:
try:
- client.FakePayload(b'a').read(2)
+ client.FakePayload(b"a").read(2)
except Exception as err:
reference_error = err
# install the custom handler that tries to access request.POST
self.client.handler = POSTAccessingHandler()
- with open(__file__, 'rb') as fp:
+ with open(__file__, "rb") as fp:
post_data = {
- 'name': 'Ringo',
- 'file_field': fp,
+ "name": "Ringo",
+ "file_field": fp,
}
try:
- self.client.post('/upload_errors/', post_data)
+ self.client.post("/upload_errors/", post_data)
except reference_error.__class__ as err:
self.assertNotEqual(
str(err),
str(reference_error),
- "Caught a repeated exception that'll cause an infinite loop in file uploads."
+ "Caught a repeated exception that'll cause an infinite loop in file uploads.",
)
except Exception as err:
# CustomUploadError is the error that should have been raised
@@ -643,64 +720,66 @@ class FileUploadTests(TestCase):
# Synthesize the contents of a file upload with a mixed case filename
# so we don't have to carry such a file in the Django tests source code
# tree.
- vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
+ vars = {"boundary": "oUrBoUnDaRyStRiNg"}
post_data = [
- '--%(boundary)s',
+ "--%(boundary)s",
'Content-Disposition: form-data; name="file_field"; filename="MiXeD_cAsE.txt"',
- 'Content-Type: application/octet-stream',
- '',
- 'file contents\n',
- '--%(boundary)s--\r\n',
+ "Content-Type: application/octet-stream",
+ "",
+ "file contents\n",
+ "--%(boundary)s--\r\n",
]
response = self.client.post(
- '/filename_case/',
- '\r\n'.join(post_data) % vars,
- 'multipart/form-data; boundary=%(boundary)s' % vars
+ "/filename_case/",
+ "\r\n".join(post_data) % vars,
+ "multipart/form-data; boundary=%(boundary)s" % vars,
)
self.assertEqual(response.status_code, 200)
id = int(response.content)
obj = FileModel.objects.get(pk=id)
# The name of the file uploaded and the file stored in the server-side
# shouldn't differ.
- self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
+ self.assertEqual(os.path.basename(obj.testfile.path), "MiXeD_cAsE.txt")
def test_filename_traversal_upload(self):
os.makedirs(UPLOAD_TO, exist_ok=True)
tests = [
- '..&#x2F;test.txt',
- '..&sol;test.txt',
+ "..&#x2F;test.txt",
+ "..&sol;test.txt",
]
for file_name in tests:
with self.subTest(file_name=file_name):
payload = client.FakePayload()
payload.write(
- '\r\n'.join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="my_file"; '
- 'filename="%s";' % file_name,
- 'Content-Type: text/plain',
- '',
- 'file contents.\r\n',
- '\r\n--' + client.BOUNDARY + '--\r\n',
- ]),
+ "\r\n".join(
+ [
+ "--" + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="my_file"; '
+ 'filename="%s";' % file_name,
+ "Content-Type: text/plain",
+ "",
+ "file contents.\r\n",
+ "\r\n--" + client.BOUNDARY + "--\r\n",
+ ]
+ ),
)
r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/upload_traversal/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': payload,
+ "CONTENT_LENGTH": len(payload),
+ "CONTENT_TYPE": client.MULTIPART_CONTENT,
+ "PATH_INFO": "/upload_traversal/",
+ "REQUEST_METHOD": "POST",
+ "wsgi.input": payload,
}
response = self.client.request(**r)
result = response.json()
self.assertEqual(response.status_code, 200)
- self.assertEqual(result['file_name'], 'test.txt')
+ self.assertEqual(result["file_name"], "test.txt")
self.assertIs(
- os.path.exists(os.path.join(MEDIA_ROOT, 'test.txt')),
+ os.path.exists(os.path.join(MEDIA_ROOT, "test.txt")),
False,
)
self.assertIs(
- os.path.exists(os.path.join(UPLOAD_TO, 'test.txt')),
+ os.path.exists(os.path.join(UPLOAD_TO, "test.txt")),
True,
)
@@ -711,6 +790,7 @@ class DirectoryCreationTests(SimpleTestCase):
Tests for error handling during directory creation
via _save_FIELD_file (ticket #6450)
"""
+
@classmethod
def setUpClass(cls):
super().setUpClass()
@@ -720,85 +800,128 @@ class DirectoryCreationTests(SimpleTestCase):
def setUp(self):
self.obj = FileModel()
- @unittest.skipIf(sys.platform == 'win32', "Python on Windows doesn't have working os.chmod().")
+ @unittest.skipIf(
+ sys.platform == "win32", "Python on Windows doesn't have working os.chmod()."
+ )
def test_readonly_root(self):
"""Permission errors are not swallowed"""
os.chmod(MEDIA_ROOT, 0o500)
self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
with self.assertRaises(PermissionError):
- self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'), save=False)
+ self.obj.testfile.save(
+ "foo.txt", SimpleUploadedFile("foo.txt", b"x"), save=False
+ )
def test_not_a_directory(self):
# Create a file with the upload directory name
- open(UPLOAD_TO, 'wb').close()
+ open(UPLOAD_TO, "wb").close()
self.addCleanup(os.remove, UPLOAD_TO)
- msg = '%s exists and is not a directory.' % UPLOAD_TO
+ msg = "%s exists and is not a directory." % UPLOAD_TO
with self.assertRaisesMessage(FileExistsError, msg):
- with SimpleUploadedFile('foo.txt', b'x') as file:
- self.obj.testfile.save('foo.txt', file, save=False)
+ with SimpleUploadedFile("foo.txt", b"x") as file:
+ self.obj.testfile.save("foo.txt", file, save=False)
class MultiParserTests(SimpleTestCase):
-
def test_empty_upload_handlers(self):
# We're not actually parsing here; just checking if the parser properly
# instantiates with empty upload handlers.
- MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': '1'
- }, StringIO('x'), [], 'utf-8')
+ MultiPartParser(
+ {
+ "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
+ "CONTENT_LENGTH": "1",
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
def test_invalid_content_type(self):
- with self.assertRaisesMessage(MultiPartParserError, 'Invalid Content-Type: text/plain'):
- MultiPartParser({
- 'CONTENT_TYPE': 'text/plain',
- 'CONTENT_LENGTH': '1',
- }, StringIO('x'), [], 'utf-8')
+ with self.assertRaisesMessage(
+ MultiPartParserError, "Invalid Content-Type: text/plain"
+ ):
+ MultiPartParser(
+ {
+ "CONTENT_TYPE": "text/plain",
+ "CONTENT_LENGTH": "1",
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
def test_negative_content_length(self):
- with self.assertRaisesMessage(MultiPartParserError, 'Invalid content length: -1'):
- MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': -1,
- }, StringIO('x'), [], 'utf-8')
+ with self.assertRaisesMessage(
+ MultiPartParserError, "Invalid content length: -1"
+ ):
+ MultiPartParser(
+ {
+ "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
+ "CONTENT_LENGTH": -1,
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
def test_bad_type_content_length(self):
- multipart_parser = MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': 'a',
- }, StringIO('x'), [], 'utf-8')
+ multipart_parser = MultiPartParser(
+ {
+ "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
+ "CONTENT_LENGTH": "a",
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
self.assertEqual(multipart_parser._content_length, 0)
def test_sanitize_file_name(self):
- parser = MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': '1'
- }, StringIO('x'), [], 'utf-8')
+ parser = MultiPartParser(
+ {
+ "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
+ "CONTENT_LENGTH": "1",
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
for file_name in CANDIDATE_TRAVERSAL_FILE_NAMES:
with self.subTest(file_name=file_name):
- self.assertEqual(parser.sanitize_file_name(file_name), 'hax0rd.txt')
+ self.assertEqual(parser.sanitize_file_name(file_name), "hax0rd.txt")
def test_sanitize_invalid_file_name(self):
- parser = MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': '1',
- }, StringIO('x'), [], 'utf-8')
+ parser = MultiPartParser(
+ {
+ "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
+ "CONTENT_LENGTH": "1",
+ },
+ StringIO("x"),
+ [],
+ "utf-8",
+ )
for file_name in CANDIDATE_INVALID_FILE_NAMES:
with self.subTest(file_name=file_name):
self.assertIsNone(parser.sanitize_file_name(file_name))
def test_rfc2231_parsing(self):
test_data = (
- (b"Content-Type: application/x-stuff; title*=us-ascii'en-us'This%20is%20%2A%2A%2Afun%2A%2A%2A",
- "This is ***fun***"),
- (b"Content-Type: application/x-stuff; title*=UTF-8''foo-%c3%a4.html",
- "foo-ä.html"),
- (b"Content-Type: application/x-stuff; title*=iso-8859-1''foo-%E4.html",
- "foo-ä.html"),
+ (
+ b"Content-Type: application/x-stuff; title*=us-ascii'en-us'This%20is%20%2A%2A%2Afun%2A%2A%2A",
+ "This is ***fun***",
+ ),
+ (
+ b"Content-Type: application/x-stuff; title*=UTF-8''foo-%c3%a4.html",
+ "foo-ä.html",
+ ),
+ (
+ b"Content-Type: application/x-stuff; title*=iso-8859-1''foo-%E4.html",
+ "foo-ä.html",
+ ),
)
for raw_line, expected_title in test_data:
parsed = parse_header(raw_line)
- self.assertEqual(parsed[1]['title'], expected_title)
+ self.assertEqual(parsed[1]["title"], expected_title)
def test_rfc2231_wrong_title(self):
"""
@@ -806,13 +929,13 @@ class MultiParserTests(SimpleTestCase):
Parsing should not crash (#24209).
"""
test_data = (
- (b"Content-Type: application/x-stuff; title*='This%20is%20%2A%2A%2Afun%2A%2A%2A",
- b"'This%20is%20%2A%2A%2Afun%2A%2A%2A"),
- (b"Content-Type: application/x-stuff; title*='foo.html",
- b"'foo.html"),
- (b"Content-Type: application/x-stuff; title*=bar.html",
- b"bar.html"),
+ (
+ b"Content-Type: application/x-stuff; title*='This%20is%20%2A%2A%2Afun%2A%2A%2A",
+ b"'This%20is%20%2A%2A%2Afun%2A%2A%2A",
+ ),
+ (b"Content-Type: application/x-stuff; title*='foo.html", b"'foo.html"),
+ (b"Content-Type: application/x-stuff; title*=bar.html", b"bar.html"),
)
for raw_line, expected_title in test_data:
parsed = parse_header(raw_line)
- self.assertEqual(parsed[1]['title'], expected_title)
+ self.assertEqual(parsed[1]["title"], expected_title)
diff --git a/tests/file_uploads/uploadhandler.py b/tests/file_uploads/uploadhandler.py
index eac6de037c..eecbc6dc9b 100644
--- a/tests/file_uploads/uploadhandler.py
+++ b/tests/file_uploads/uploadhandler.py
@@ -5,7 +5,9 @@ import os
from tempfile import NamedTemporaryFile
from django.core.files.uploadhandler import (
- FileUploadHandler, StopUpload, TemporaryFileUploadHandler,
+ FileUploadHandler,
+ StopUpload,
+ TemporaryFileUploadHandler,
)
@@ -15,7 +17,7 @@ class QuotaUploadHandler(FileUploadHandler):
(5MB) is uploaded.
"""
- QUOTA = 5 * 2 ** 20 # 5 MB
+ QUOTA = 5 * 2**20 # 5 MB
def __init__(self, request=None):
super().__init__(request)
@@ -33,6 +35,7 @@ class QuotaUploadHandler(FileUploadHandler):
class StopUploadTemporaryFileHandler(TemporaryFileUploadHandler):
"""A handler that raises a StopUpload exception."""
+
def receive_data_chunk(self, raw_data, start):
raise StopUpload()
@@ -43,12 +46,14 @@ class CustomUploadError(Exception):
class ErroringUploadHandler(FileUploadHandler):
"""A handler that raises an exception."""
+
def receive_data_chunk(self, raw_data, start):
raise CustomUploadError("Oops!")
class TraversalUploadHandler(FileUploadHandler):
"""A handler with potential directory-traversal vulnerability."""
+
def __init__(self, request=None):
from .views import UPLOAD_TO
@@ -58,19 +63,28 @@ class TraversalUploadHandler(FileUploadHandler):
def file_complete(self, file_size):
self.file.seek(0)
self.file.size = file_size
- with open(os.path.join(self.upload_dir, self.file_name), 'wb') as fp:
+ with open(os.path.join(self.upload_dir, self.file_name), "wb") as fp:
fp.write(self.file.read())
return self.file
def new_file(
- self, field_name, file_name, content_type, content_length, charset=None,
+ self,
+ field_name,
+ file_name,
+ content_type,
+ content_length,
+ charset=None,
content_type_extra=None,
):
super().new_file(
- file_name, file_name, content_length, content_length, charset,
+ file_name,
+ file_name,
+ content_length,
+ content_length,
+ charset,
content_type_extra,
)
- self.file = NamedTemporaryFile(suffix='.upload', dir=self.upload_dir)
+ self.file = NamedTemporaryFile(suffix=".upload", dir=self.upload_dir)
def receive_data_chunk(self, raw_data, start):
self.file.write(raw_data)
diff --git a/tests/file_uploads/urls.py b/tests/file_uploads/urls.py
index 9df5432403..7cb54f3be0 100644
--- a/tests/file_uploads/urls.py
+++ b/tests/file_uploads/urls.py
@@ -3,19 +3,19 @@ from django.urls import path, re_path
from . import views
urlpatterns = [
- path('upload/', views.file_upload_view),
- path('upload_traversal/', views.file_upload_traversal_view),
- path('verify/', views.file_upload_view_verify),
- path('unicode_name/', views.file_upload_unicode_name),
- path('echo/', views.file_upload_echo),
- path('echo_content_type_extra/', views.file_upload_content_type_extra),
- path('echo_content/', views.file_upload_echo_content),
- path('quota/', views.file_upload_quota),
- path('quota/broken/', views.file_upload_quota_broken),
- path('getlist_count/', views.file_upload_getlist_count),
- path('upload_errors/', views.file_upload_errors),
- path('temp_file/stop_upload/', views.file_stop_upload_temporary_file),
- path('temp_file/upload_interrupted/', views.file_upload_interrupted_temporary_file),
- path('filename_case/', views.file_upload_filename_case_view),
- re_path(r'^fd_closing/(?P<access>t|f)/$', views.file_upload_fd_closing),
+ path("upload/", views.file_upload_view),
+ path("upload_traversal/", views.file_upload_traversal_view),
+ path("verify/", views.file_upload_view_verify),
+ path("unicode_name/", views.file_upload_unicode_name),
+ path("echo/", views.file_upload_echo),
+ path("echo_content_type_extra/", views.file_upload_content_type_extra),
+ path("echo_content/", views.file_upload_echo_content),
+ path("quota/", views.file_upload_quota),
+ path("quota/broken/", views.file_upload_quota_broken),
+ path("getlist_count/", views.file_upload_getlist_count),
+ path("upload_errors/", views.file_upload_errors),
+ path("temp_file/stop_upload/", views.file_stop_upload_temporary_file),
+ path("temp_file/upload_interrupted/", views.file_upload_interrupted_temporary_file),
+ path("filename_case/", views.file_upload_filename_case_view),
+ re_path(r"^fd_closing/(?P<access>t|f)/$", views.file_upload_fd_closing),
]
diff --git a/tests/file_uploads/views.py b/tests/file_uploads/views.py
index 50de6238b4..d5efbba3ce 100644
--- a/tests/file_uploads/views.py
+++ b/tests/file_uploads/views.py
@@ -8,7 +8,9 @@ from django.http import HttpResponse, HttpResponseServerError, JsonResponse
from .models import FileModel
from .tests import UNICODE_FILENAME, UPLOAD_TO
from .uploadhandler import (
- ErroringUploadHandler, QuotaUploadHandler, StopUploadTemporaryFileHandler,
+ ErroringUploadHandler,
+ QuotaUploadHandler,
+ StopUploadTemporaryFileHandler,
TraversalUploadHandler,
)
@@ -19,10 +21,12 @@ def file_upload_view(request):
"""
form_data = request.POST.copy()
form_data.update(request.FILES)
- if isinstance(form_data.get('file_field'), UploadedFile) and isinstance(form_data['name'], str):
+ if isinstance(form_data.get("file_field"), UploadedFile) and isinstance(
+ form_data["name"], str
+ ):
# If a file is posted, the dummy client should only post the file name,
# not the full path.
- if os.path.dirname(form_data['file_field'].name) != '':
+ if os.path.dirname(form_data["file_field"].name) != "":
return HttpResponseServerError()
return HttpResponse()
else:
@@ -37,11 +41,11 @@ def file_upload_view_verify(request):
form_data.update(request.FILES)
for key, value in form_data.items():
- if key.endswith('_hash'):
+ if key.endswith("_hash"):
continue
- if key + '_hash' not in form_data:
+ if key + "_hash" not in form_data:
continue
- submitted_hash = form_data[key + '_hash']
+ submitted_hash = form_data[key + "_hash"]
if isinstance(value, UploadedFile):
new_hash = hashlib.sha1(value.read()).hexdigest()
else:
@@ -50,7 +54,7 @@ def file_upload_view_verify(request):
return HttpResponseServerError()
# Adding large file to the database should succeed
- largefile = request.FILES['file_field2']
+ largefile = request.FILES["file_field2"]
obj = FileModel()
obj.testfile.save(largefile.name, largefile)
@@ -59,13 +63,13 @@ def file_upload_view_verify(request):
def file_upload_unicode_name(request):
# Check to see if Unicode name came through properly.
- if not request.FILES['file_unicode'].name.endswith(UNICODE_FILENAME):
+ if not request.FILES["file_unicode"].name.endswith(UNICODE_FILENAME):
return HttpResponseServerError()
# Check to make sure the exotic characters are preserved even
# through file save.
- uni_named_file = request.FILES['file_unicode']
+ uni_named_file = request.FILES["file_unicode"]
FileModel.objects.create(testfile=uni_named_file)
- full_name = '%s/%s' % (UPLOAD_TO, uni_named_file.name)
+ full_name = "%s/%s" % (UPLOAD_TO, uni_named_file.name)
return HttpResponse() if os.path.exists(full_name) else HttpResponseServerError()
@@ -81,9 +85,11 @@ def file_upload_echo_content(request):
"""
Simple view to echo back the content of uploaded files for tests.
"""
+
def read_and_close(f):
with f:
return f.read().decode()
+
r = {k: read_and_close(f) for k, f in request.FILES.items()}
return JsonResponse(r)
@@ -110,7 +116,7 @@ def file_stop_upload_temporary_file(request):
request.upload_handlers.pop(2)
request.FILES # Trigger file parsing.
return JsonResponse(
- {'temp_path': request.upload_handlers[0].file.temporary_file_path()},
+ {"temp_path": request.upload_handlers[0].file.temporary_file_path()},
)
@@ -119,7 +125,7 @@ def file_upload_interrupted_temporary_file(request):
request.upload_handlers.pop(2)
request.FILES # Trigger file parsing.
return JsonResponse(
- {'temp_path': request.upload_handlers[0].file.temporary_file_path()},
+ {"temp_path": request.upload_handlers[0].file.temporary_file_path()},
)
@@ -143,10 +149,10 @@ def file_upload_filename_case_view(request):
"""
Check adding the file to the database will preserve the filename case.
"""
- file = request.FILES['file_field']
+ file = request.FILES["file_field"]
obj = FileModel()
obj.testfile.save(file.name, file)
- return HttpResponse('%d' % obj.pk)
+ return HttpResponse("%d" % obj.pk)
def file_upload_content_type_extra(request):
@@ -155,12 +161,14 @@ def file_upload_content_type_extra(request):
"""
params = {}
for file_name, uploadedfile in request.FILES.items():
- params[file_name] = {k: v.decode() for k, v in uploadedfile.content_type_extra.items()}
+ params[file_name] = {
+ k: v.decode() for k, v in uploadedfile.content_type_extra.items()
+ }
return JsonResponse(params)
def file_upload_fd_closing(request, access):
- if access == 't':
+ if access == "t":
request.FILES # Trigger file parsing.
return HttpResponse()
@@ -169,5 +177,5 @@ def file_upload_traversal_view(request):
request.upload_handlers.insert(0, TraversalUploadHandler())
request.FILES # Trigger file parsing.
return JsonResponse(
- {'file_name': request.upload_handlers[0].file_name},
+ {"file_name": request.upload_handlers[0].file_name},
)