summaryrefslogtreecommitdiff
path: root/tests/file_uploads/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/file_uploads/tests.py')
-rw-r--r--tests/file_uploads/tests.py415
1 files changed, 415 insertions, 0 deletions
diff --git a/tests/file_uploads/tests.py b/tests/file_uploads/tests.py
new file mode 100644
index 0000000000..45e7342abd
--- /dev/null
+++ b/tests/file_uploads/tests.py
@@ -0,0 +1,415 @@
+#! -*- coding: utf-8 -*-
+from __future__ import absolute_import, unicode_literals
+
+import base64
+import errno
+import hashlib
+import json
+import os
+import shutil
+import tempfile as sys_tempfile
+
+from django.core.files import temp as tempfile
+from django.core.files.uploadedfile import SimpleUploadedFile
+from django.http.multipartparser import MultiPartParser
+from django.test import TestCase, client
+from django.test.utils import override_settings
+from django.utils.encoding import force_bytes
+from django.utils.six import StringIO
+from django.utils import unittest
+
+from . import uploadhandler
+from .models import FileModel
+
+
+UNICODE_FILENAME = 'test-0123456789_中文_Orléans.jpg'
+MEDIA_ROOT = sys_tempfile.mkdtemp()
+UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
+
+@override_settings(MEDIA_ROOT=MEDIA_ROOT)
+class FileUploadTests(TestCase):
+ @classmethod
+ def setUpClass(cls):
+ if not os.path.isdir(MEDIA_ROOT):
+ os.makedirs(MEDIA_ROOT)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(MEDIA_ROOT)
+
+ def test_simple_upload(self):
+ with open(__file__, 'rb') as fp:
+ post_data = {
+ 'name': 'Ringo',
+ 'file_field': fp,
+ }
+ response = self.client.post('/file_uploads/upload/', post_data)
+ self.assertEqual(response.status_code, 200)
+
+ def test_large_upload(self):
+ tdir = tempfile.gettempdir()
+
+ file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
+ file1.write(b'a' * (2 ** 21))
+ file1.seek(0)
+
+ file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
+ file2.write(b'a' * (10 * 2 ** 20))
+ file2.seek(0)
+
+ post_data = {
+ 'name': 'Ringo',
+ 'file_field1': file1,
+ 'file_field2': file2,
+ }
+
+ for key in list(post_data):
+ try:
+ post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
+ post_data[key].seek(0)
+ except AttributeError:
+ post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
+
+ response = self.client.post('/file_uploads/verify/', post_data)
+
+ self.assertEqual(response.status_code, 200)
+
+ def _test_base64_upload(self, content):
+ payload = client.FakePayload("\r\n".join([
+ '--' + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="test.txt"',
+ 'Content-Type: application/octet-stream',
+ 'Content-Transfer-Encoding: base64',
+ '',]))
+ payload.write(b"\r\n" + base64.b64encode(force_bytes(content)) + b"\r\n")
+ payload.write('--' + client.BOUNDARY + '--\r\n')
+ r = {
+ 'CONTENT_LENGTH': len(payload),
+ 'CONTENT_TYPE': client.MULTIPART_CONTENT,
+ 'PATH_INFO': "/file_uploads/echo_content/",
+ 'REQUEST_METHOD': 'POST',
+ 'wsgi.input': payload,
+ }
+ response = self.client.request(**r)
+ received = json.loads(response.content.decode('utf-8'))
+
+ self.assertEqual(received['file'], content)
+
+ def test_base64_upload(self):
+ self._test_base64_upload("This data will be transmitted base64-encoded.")
+
+ def test_big_base64_upload(self):
+ self._test_base64_upload("Big data" * 68000) # > 512Kb
+
+ def test_unicode_file_name(self):
+ tdir = sys_tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tdir, True)
+
+ # This file contains chinese symbols and an accented char in the name.
+ with open(os.path.join(tdir, UNICODE_FILENAME), 'w+b') as file1:
+ file1.write(b'b' * (2 ** 10))
+ file1.seek(0)
+
+ post_data = {
+ 'file_unicode': file1,
+ }
+
+ response = self.client.post('/file_uploads/unicode_name/', post_data)
+
+ self.assertEqual(response.status_code, 200)
+
+ def test_dangerous_file_names(self):
+ """Uploaded file names should be sanitized before ever reaching the view."""
+ # This test simulates possible directory traversal attacks by a
+ # malicious uploader We have to do some monkeybusiness here to construct
+ # a malicious payload with an invalid file name (containing os.sep or
+ # os.pardir). This similar to what an attacker would need to do when
+ # trying such an attack.
+ scary_file_names = [
+ "/tmp/hax0rd.txt", # Absolute path, *nix-style.
+ "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
+ "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
+ "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
+ "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
+ "subdir/hax0rd.txt", # Descendant path, *nix-style.
+ "subdir\\hax0rd.txt", # Descendant path, win-style.
+ "sub/dir\\hax0rd.txt", # Descendant path, mixed.
+ "../../hax0rd.txt", # Relative path, *nix-style.
+ "..\\..\\hax0rd.txt", # Relative path, win-style.
+ "../..\\hax0rd.txt" # Relative path, mixed.
+ ]
+
+ payload = client.FakePayload()
+ for i, name in enumerate(scary_file_names):
+ payload.write('\r\n'.join([
+ '--' + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
+ 'Content-Type: application/octet-stream',
+ '',
+ 'You got pwnd.\r\n'
+ ]))
+ payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
+
+ r = {
+ 'CONTENT_LENGTH': len(payload),
+ 'CONTENT_TYPE': client.MULTIPART_CONTENT,
+ 'PATH_INFO': "/file_uploads/echo/",
+ 'REQUEST_METHOD': 'POST',
+ 'wsgi.input': payload,
+ }
+ response = self.client.request(**r)
+
+ # The filenames should have been sanitized by the time it got to the view.
+ recieved = json.loads(response.content.decode('utf-8'))
+ for i, name in enumerate(scary_file_names):
+ got = recieved["file%s" % i]
+ self.assertEqual(got, "hax0rd.txt")
+
+ def test_filename_overflow(self):
+ """File names over 256 characters (dangerous on some platforms) get fixed up."""
+ name = "%s.txt" % ("f"*500)
+ payload = client.FakePayload("\r\n".join([
+ '--' + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="%s"' % name,
+ 'Content-Type: application/octet-stream',
+ '',
+ 'Oops.'
+ '--' + client.BOUNDARY + '--',
+ '',
+ ]))
+ r = {
+ 'CONTENT_LENGTH': len(payload),
+ 'CONTENT_TYPE': client.MULTIPART_CONTENT,
+ 'PATH_INFO': "/file_uploads/echo/",
+ 'REQUEST_METHOD': 'POST',
+ 'wsgi.input': payload,
+ }
+ got = json.loads(self.client.request(**r).content.decode('utf-8'))
+ self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
+
+ def test_truncated_multipart_handled_gracefully(self):
+ """
+ If passed an incomplete multipart message, MultiPartParser does not
+ attempt to read beyond the end of the stream, and simply will handle
+ the part that can be parsed gracefully.
+ """
+ payload_str = "\r\n".join([
+ '--' + client.BOUNDARY,
+ 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
+ 'Content-Type: application/octet-stream',
+ '',
+ 'file contents'
+ '--' + client.BOUNDARY + '--',
+ '',
+ ])
+ payload = client.FakePayload(payload_str[:-10])
+ r = {
+ 'CONTENT_LENGTH': len(payload),
+ 'CONTENT_TYPE': client.MULTIPART_CONTENT,
+ 'PATH_INFO': '/file_uploads/echo/',
+ 'REQUEST_METHOD': 'POST',
+ 'wsgi.input': payload,
+ }
+ got = json.loads(self.client.request(**r).content.decode('utf-8'))
+ self.assertEqual(got, {})
+
+ def test_empty_multipart_handled_gracefully(self):
+ """
+ If passed an empty multipart message, MultiPartParser will return
+ an empty QueryDict.
+ """
+ r = {
+ 'CONTENT_LENGTH': 0,
+ 'CONTENT_TYPE': client.MULTIPART_CONTENT,
+ 'PATH_INFO': '/file_uploads/echo/',
+ 'REQUEST_METHOD': 'POST',
+ 'wsgi.input': client.FakePayload(b''),
+ }
+ got = json.loads(self.client.request(**r).content.decode('utf-8'))
+ self.assertEqual(got, {})
+
+ def test_custom_upload_handler(self):
+ # A small file (under the 5M quota)
+ smallfile = tempfile.NamedTemporaryFile()
+ smallfile.write(b'a' * (2 ** 21))
+ smallfile.seek(0)
+
+ # A big file (over the quota)
+ bigfile = tempfile.NamedTemporaryFile()
+ bigfile.write(b'a' * (10 * 2 ** 20))
+ bigfile.seek(0)
+
+ # Small file posting should work.
+ response = self.client.post('/file_uploads/quota/', {'f': smallfile})
+ got = json.loads(response.content.decode('utf-8'))
+ self.assertTrue('f' in got)
+
+ # Large files don't go through.
+ response = self.client.post("/file_uploads/quota/", {'f': bigfile})
+ got = json.loads(response.content.decode('utf-8'))
+ self.assertTrue('f' not in got)
+
+ def test_broken_custom_upload_handler(self):
+ f = tempfile.NamedTemporaryFile()
+ f.write(b'a' * (2 ** 21))
+ f.seek(0)
+
+ # AttributeError: You cannot alter upload handlers after the upload has been processed.
+ self.assertRaises(
+ AttributeError,
+ self.client.post,
+ '/file_uploads/quota/broken/',
+ {'f': f}
+ )
+
+ def test_fileupload_getlist(self):
+ file1 = tempfile.NamedTemporaryFile()
+ file1.write(b'a' * (2 ** 23))
+ file1.seek(0)
+
+ file2 = tempfile.NamedTemporaryFile()
+ file2.write(b'a' * (2 * 2 ** 18))
+ file2.seek(0)
+
+ file2a = tempfile.NamedTemporaryFile()
+ file2a.write(b'a' * (5 * 2 ** 20))
+ file2a.seek(0)
+
+ response = self.client.post('/file_uploads/getlist_count/', {
+ 'file1': file1,
+ 'field1': 'test',
+ 'field2': 'test3',
+ 'field3': 'test5',
+ 'field4': 'test6',
+ 'field5': 'test7',
+ 'file2': (file2, file2a)
+ })
+ got = json.loads(response.content.decode('utf-8'))
+
+ self.assertEqual(got.get('file1'), 1)
+ self.assertEqual(got.get('file2'), 2)
+
+ def test_file_error_blocking(self):
+ """
+ The server should not block when there are upload errors (bug #8622).
+ This can happen if something -- i.e. an exception handler -- tries to
+ access POST while handling an error in parsing POST. This shouldn't
+ cause an infinite loop!
+ """
+ class POSTAccessingHandler(client.ClientHandler):
+ """A handler that'll access POST during an exception."""
+ def handle_uncaught_exception(self, request, resolver, exc_info):
+ ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
+ p = request.POST
+ return ret
+
+ # Maybe this is a little more complicated that it needs to be; but if
+ # the django.test.client.FakePayload.read() implementation changes then
+ # this test would fail. So we need to know exactly what kind of error
+ # it raises when there is an attempt to read more than the available bytes:
+ try:
+ client.FakePayload(b'a').read(2)
+ except Exception as err:
+ reference_error = err
+
+ # install the custom handler that tries to access request.POST
+ self.client.handler = POSTAccessingHandler()
+
+ with open(__file__, 'rb') as fp:
+ post_data = {
+ 'name': 'Ringo',
+ 'file_field': fp,
+ }
+ try:
+ response = self.client.post('/file_uploads/upload_errors/', post_data)
+ except reference_error.__class__ as err:
+ self.assertFalse(
+ str(err) == str(reference_error),
+ "Caught a repeated exception that'll cause an infinite loop in file uploads."
+ )
+ except Exception as err:
+ # CustomUploadError is the error that should have been raised
+ self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
+
+ def test_filename_case_preservation(self):
+ """
+ The storage backend shouldn't mess with the case of the filenames
+ uploaded.
+ """
+ # Synthesize the contents of a file upload with a mixed case filename
+ # so we don't have to carry such a file in the Django tests source code
+ # tree.
+ vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
+ post_data = [
+ '--%(boundary)s',
+ 'Content-Disposition: form-data; name="file_field"; '
+ 'filename="MiXeD_cAsE.txt"',
+ 'Content-Type: application/octet-stream',
+ '',
+ 'file contents\n'
+ '',
+ '--%(boundary)s--\r\n',
+ ]
+ response = self.client.post(
+ '/file_uploads/filename_case/',
+ '\r\n'.join(post_data) % vars,
+ 'multipart/form-data; boundary=%(boundary)s' % vars
+ )
+ self.assertEqual(response.status_code, 200)
+ id = int(response.content)
+ obj = FileModel.objects.get(pk=id)
+ # The name of the file uploaded and the file stored in the server-side
+ # shouldn't differ.
+ self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
+
+@override_settings(MEDIA_ROOT=MEDIA_ROOT)
+class DirectoryCreationTests(TestCase):
+ """
+ Tests for error handling during directory creation
+ via _save_FIELD_file (ticket #6450)
+ """
+ @classmethod
+ def setUpClass(cls):
+ if not os.path.isdir(MEDIA_ROOT):
+ os.makedirs(MEDIA_ROOT)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(MEDIA_ROOT)
+
+ def setUp(self):
+ self.obj = FileModel()
+
+ def test_readonly_root(self):
+ """Permission errors are not swallowed"""
+ os.chmod(MEDIA_ROOT, 0o500)
+ self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
+ try:
+ self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
+ except OSError as err:
+ self.assertEqual(err.errno, errno.EACCES)
+ except Exception:
+ self.fail("OSError [Errno %s] not raised." % errno.EACCES)
+
+ def test_not_a_directory(self):
+ """The correct IOError is raised when the upload directory name exists but isn't a directory"""
+ # Create a file with the upload directory name
+ open(UPLOAD_TO, 'wb').close()
+ self.addCleanup(os.remove, UPLOAD_TO)
+ with self.assertRaises(IOError) as exc_info:
+ self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
+ # The test needs to be done on a specific string as IOError
+ # is raised even without the patch (just not early enough)
+ self.assertEqual(exc_info.exception.args[0],
+ "%s exists and is not a directory." % UPLOAD_TO)
+
+
+class MultiParserTests(unittest.TestCase):
+
+ def test_empty_upload_handlers(self):
+ # We're not actually parsing here; just checking if the parser properly
+ # instantiates with empty upload handlers.
+ parser = MultiPartParser({
+ 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
+ 'CONTENT_LENGTH': '1'
+ }, StringIO('x'), [], 'utf-8')