summaryrefslogtreecommitdiff
path: root/tests/messages_tests
diff options
context:
space:
mode:
authorTim Graham <timograham@gmail.com>2015-02-10 08:30:35 -0500
committerTim Graham <timograham@gmail.com>2015-02-11 10:19:22 -0500
commitb3cd9e0d0732e1fd7d2afd5825dda5b3857927da (patch)
tree705f2f18621410b75680e1a520a2b12a9cf2dd7b /tests/messages_tests
parentfac3a34cbb8fd8dc74006f187dd1e83f3a040d39 (diff)
downloaddjango-b3cd9e0d0732e1fd7d2afd5825dda5b3857927da.tar.gz
Moved contrib.messages tests out of contrib.
Diffstat (limited to 'tests/messages_tests')
-rw-r--r--tests/messages_tests/__init__.py0
-rw-r--r--tests/messages_tests/base.py383
-rw-r--r--tests/messages_tests/test_api.py54
-rw-r--r--tests/messages_tests/test_cookie.py179
-rw-r--r--tests/messages_tests/test_fallback.py176
-rw-r--r--tests/messages_tests/test_middleware.py19
-rw-r--r--tests/messages_tests/test_mixins.py16
-rw-r--r--tests/messages_tests/test_session.py54
-rw-r--r--tests/messages_tests/urls.py80
9 files changed, 961 insertions, 0 deletions
diff --git a/tests/messages_tests/__init__.py b/tests/messages_tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/messages_tests/__init__.py
diff --git a/tests/messages_tests/base.py b/tests/messages_tests/base.py
new file mode 100644
index 0000000000..ad2d77cafb
--- /dev/null
+++ b/tests/messages_tests/base.py
@@ -0,0 +1,383 @@
+from django import http
+from django.contrib.messages import constants, get_level, set_level, utils
+from django.contrib.messages.api import MessageFailure
+from django.contrib.messages.constants import DEFAULT_LEVELS
+from django.contrib.messages.storage import base, default_storage
+from django.contrib.messages.storage.base import Message
+from django.core.urlresolvers import reverse
+from django.test import modify_settings, override_settings
+from django.utils.translation import ugettext_lazy
+
+
+def add_level_messages(storage):
+ """
+ Adds 6 messages from different levels (including a custom one) to a storage
+ instance.
+ """
+ storage.add(constants.INFO, 'A generic info message')
+ storage.add(29, 'Some custom level')
+ storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag')
+ storage.add(constants.WARNING, 'A warning')
+ storage.add(constants.ERROR, 'An error')
+ storage.add(constants.SUCCESS, 'This was a triumph.')
+
+
+class override_settings_tags(override_settings):
+ def enable(self):
+ super(override_settings_tags, self).enable()
+ # LEVEL_TAGS is a constant defined in the
+ # django.contrib.messages.storage.base module, so after changing
+ # settings.MESSAGE_TAGS, we need to update that constant too.
+ self.old_level_tags = base.LEVEL_TAGS
+ base.LEVEL_TAGS = utils.get_level_tags()
+
+ def disable(self):
+ super(override_settings_tags, self).disable()
+ base.LEVEL_TAGS = self.old_level_tags
+
+
+class BaseTests(object):
+ storage_class = default_storage
+ levels = {
+ 'debug': constants.DEBUG,
+ 'info': constants.INFO,
+ 'success': constants.SUCCESS,
+ 'warning': constants.WARNING,
+ 'error': constants.ERROR,
+ }
+
+ def setUp(self):
+ self.settings_override = override_settings_tags(
+ TEMPLATES=[{
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'DIRS': [],
+ 'APP_DIRS': True,
+ 'OPTIONS': {
+ 'context_processors': (
+ 'django.contrib.auth.context_processors.auth',
+ 'django.contrib.messages.context_processors.messages',
+ ),
+ },
+ }],
+ ROOT_URLCONF='messages_tests.urls',
+ MESSAGE_TAGS='',
+ MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__,
+ self.storage_class.__name__),
+ SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer',
+ )
+ self.settings_override.enable()
+
+ def tearDown(self):
+ self.settings_override.disable()
+
+ def get_request(self):
+ return http.HttpRequest()
+
+ def get_response(self):
+ return http.HttpResponse()
+
+ def get_storage(self, data=None):
+ """
+ Returns the storage backend, setting its loaded data to the ``data``
+ argument.
+
+ This method avoids the storage ``_get`` method from getting called so
+ that other parts of the storage backend can be tested independent of
+ the message retrieval logic.
+ """
+ storage = self.storage_class(self.get_request())
+ storage._loaded_data = data or []
+ return storage
+
+ def test_add(self):
+ storage = self.get_storage()
+ self.assertFalse(storage.added_new)
+ storage.add(constants.INFO, 'Test message 1')
+ self.assertTrue(storage.added_new)
+ storage.add(constants.INFO, 'Test message 2', extra_tags='tag')
+ self.assertEqual(len(storage), 2)
+
+ def test_add_lazy_translation(self):
+ storage = self.get_storage()
+ response = self.get_response()
+
+ storage.add(constants.INFO, ugettext_lazy('lazy message'))
+ storage.update(response)
+
+ storing = self.stored_messages_count(storage, response)
+ self.assertEqual(storing, 1)
+
+ def test_no_update(self):
+ storage = self.get_storage()
+ response = self.get_response()
+ storage.update(response)
+ storing = self.stored_messages_count(storage, response)
+ self.assertEqual(storing, 0)
+
+ def test_add_update(self):
+ storage = self.get_storage()
+ response = self.get_response()
+
+ storage.add(constants.INFO, 'Test message 1')
+ storage.add(constants.INFO, 'Test message 1', extra_tags='tag')
+ storage.update(response)
+
+ storing = self.stored_messages_count(storage, response)
+ self.assertEqual(storing, 2)
+
+ def test_existing_add_read_update(self):
+ storage = self.get_existing_storage()
+ response = self.get_response()
+
+ storage.add(constants.INFO, 'Test message 3')
+ list(storage) # Simulates a read
+ storage.update(response)
+
+ storing = self.stored_messages_count(storage, response)
+ self.assertEqual(storing, 0)
+
+ def test_existing_read_add_update(self):
+ storage = self.get_existing_storage()
+ response = self.get_response()
+
+ list(storage) # Simulates a read
+ storage.add(constants.INFO, 'Test message 3')
+ storage.update(response)
+
+ storing = self.stored_messages_count(storage, response)
+ self.assertEqual(storing, 1)
+
+ @override_settings(MESSAGE_LEVEL=constants.DEBUG)
+ def test_full_request_response_cycle(self):
+ """
+ With the message middleware enabled, tests that messages are properly
+ stored and then retrieved across the full request/redirect/response
+ cycle.
+ """
+ data = {
+ 'messages': ['Test message %d' % x for x in range(5)],
+ }
+ show_url = reverse('show_message')
+ for level in ('debug', 'info', 'success', 'warning', 'error'):
+ add_url = reverse('add_message', args=(level,))
+ response = self.client.post(add_url, data, follow=True)
+ self.assertRedirects(response, show_url)
+ self.assertIn('messages', response.context)
+ messages = [Message(self.levels[level], msg) for msg in data['messages']]
+ self.assertEqual(list(response.context['messages']), messages)
+ for msg in data['messages']:
+ self.assertContains(response, msg)
+
+ @override_settings(MESSAGE_LEVEL=constants.DEBUG)
+ def test_with_template_response(self):
+ data = {
+ 'messages': ['Test message %d' % x for x in range(5)],
+ }
+ show_url = reverse('show_template_response')
+ for level in self.levels.keys():
+ add_url = reverse('add_template_response', args=(level,))
+ response = self.client.post(add_url, data, follow=True)
+ self.assertRedirects(response, show_url)
+ self.assertIn('messages', response.context)
+ for msg in data['messages']:
+ self.assertContains(response, msg)
+
+ # there shouldn't be any messages on second GET request
+ response = self.client.get(show_url)
+ for msg in data['messages']:
+ self.assertNotContains(response, msg)
+
+ def test_context_processor_message_levels(self):
+ show_url = reverse('show_template_response')
+ response = self.client.get(show_url)
+
+ self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context)
+ self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS)
+
+ @override_settings(MESSAGE_LEVEL=constants.DEBUG)
+ def test_multiple_posts(self):
+ """
+ Tests that messages persist properly when multiple POSTs are made
+ before a GET.
+ """
+ data = {
+ 'messages': ['Test message %d' % x for x in range(5)],
+ }
+ show_url = reverse('show_message')
+ messages = []
+ for level in ('debug', 'info', 'success', 'warning', 'error'):
+ messages.extend(Message(self.levels[level], msg) for msg in data['messages'])
+ add_url = reverse('add_message', args=(level,))
+ self.client.post(add_url, data)
+ response = self.client.get(show_url)
+ self.assertIn('messages', response.context)
+ self.assertEqual(list(response.context['messages']), messages)
+ for msg in data['messages']:
+ self.assertContains(response, msg)
+
+ @modify_settings(
+ INSTALLED_APPS={'remove': 'django.contrib.messages'},
+ MIDDLEWARE_CLASSES={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
+ )
+ @override_settings(
+ MESSAGE_LEVEL=constants.DEBUG,
+ TEMPLATES=[{
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'DIRS': [],
+ 'APP_DIRS': True,
+ }],
+ )
+ def test_middleware_disabled(self):
+ """
+ Tests that, when the middleware is disabled, an exception is raised
+ when one attempts to store a message.
+ """
+ data = {
+ 'messages': ['Test message %d' % x for x in range(5)],
+ }
+ reverse('show_message')
+ for level in ('debug', 'info', 'success', 'warning', 'error'):
+ add_url = reverse('add_message', args=(level,))
+ self.assertRaises(MessageFailure, self.client.post, add_url,
+ data, follow=True)
+
+ @modify_settings(
+ INSTALLED_APPS={'remove': 'django.contrib.messages'},
+ MIDDLEWARE_CLASSES={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
+ )
+ @override_settings(
+ TEMPLATES=[{
+ 'BACKEND': 'django.template.backends.django.DjangoTemplates',
+ 'DIRS': [],
+ 'APP_DIRS': True,
+ }],
+ )
+ def test_middleware_disabled_fail_silently(self):
+ """
+ Tests that, when the middleware is disabled, an exception is not
+ raised if 'fail_silently' = True
+ """
+ data = {
+ 'messages': ['Test message %d' % x for x in range(5)],
+ 'fail_silently': True,
+ }
+ show_url = reverse('show_message')
+ for level in ('debug', 'info', 'success', 'warning', 'error'):
+ add_url = reverse('add_message', args=(level,))
+ response = self.client.post(add_url, data, follow=True)
+ self.assertRedirects(response, show_url)
+ self.assertNotIn('messages', response.context)
+
+ def stored_messages_count(self, storage, response):
+ """
+ Returns the number of messages being stored after a
+ ``storage.update()`` call.
+ """
+ raise NotImplementedError('This method must be set by a subclass.')
+
+ def test_get(self):
+ raise NotImplementedError('This method must be set by a subclass.')
+
+ def get_existing_storage(self):
+ return self.get_storage([Message(constants.INFO, 'Test message 1'),
+ Message(constants.INFO, 'Test message 2',
+ extra_tags='tag')])
+
+ def test_existing_read(self):
+ """
+ Tests that reading the existing storage doesn't cause the data to be
+ lost.
+ """
+ storage = self.get_existing_storage()
+ self.assertFalse(storage.used)
+ # After iterating the storage engine directly, the used flag is set.
+ data = list(storage)
+ self.assertTrue(storage.used)
+ # The data does not disappear because it has been iterated.
+ self.assertEqual(data, list(storage))
+
+ def test_existing_add(self):
+ storage = self.get_existing_storage()
+ self.assertFalse(storage.added_new)
+ storage.add(constants.INFO, 'Test message 3')
+ self.assertTrue(storage.added_new)
+
+ def test_default_level(self):
+ # get_level works even with no storage on the request.
+ request = self.get_request()
+ self.assertEqual(get_level(request), constants.INFO)
+
+ # get_level returns the default level if it hasn't been set.
+ storage = self.get_storage()
+ request._messages = storage
+ self.assertEqual(get_level(request), constants.INFO)
+
+ # Only messages of sufficient level get recorded.
+ add_level_messages(storage)
+ self.assertEqual(len(storage), 5)
+
+ def test_low_level(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ request._messages = storage
+
+ self.assertTrue(set_level(request, 5))
+ self.assertEqual(get_level(request), 5)
+
+ add_level_messages(storage)
+ self.assertEqual(len(storage), 6)
+
+ def test_high_level(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ request._messages = storage
+
+ self.assertTrue(set_level(request, 30))
+ self.assertEqual(get_level(request), 30)
+
+ add_level_messages(storage)
+ self.assertEqual(len(storage), 2)
+
+ @override_settings(MESSAGE_LEVEL=29)
+ def test_settings_level(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+
+ self.assertEqual(get_level(request), 29)
+
+ add_level_messages(storage)
+ self.assertEqual(len(storage), 3)
+
+ def test_tags(self):
+ storage = self.get_storage()
+ storage.level = 0
+ add_level_messages(storage)
+ tags = [msg.tags for msg in storage]
+ self.assertEqual(tags,
+ ['info', '', 'extra-tag debug', 'warning', 'error',
+ 'success'])
+
+ def test_level_tag(self):
+ storage = self.get_storage()
+ storage.level = 0
+ add_level_messages(storage)
+ tags = [msg.level_tag for msg in storage]
+ self.assertEqual(tags,
+ ['info', '', 'debug', 'warning', 'error',
+ 'success'])
+
+ @override_settings_tags(MESSAGE_TAGS={
+ constants.INFO: 'info',
+ constants.DEBUG: '',
+ constants.WARNING: '',
+ constants.ERROR: 'bad',
+ 29: 'custom',
+ }
+ )
+ def test_custom_tags(self):
+ storage = self.get_storage()
+ storage.level = 0
+ add_level_messages(storage)
+ tags = [msg.tags for msg in storage]
+ self.assertEqual(tags,
+ ['info', 'custom', 'extra-tag', '', 'bad', 'success'])
diff --git a/tests/messages_tests/test_api.py b/tests/messages_tests/test_api.py
new file mode 100644
index 0000000000..f79db95866
--- /dev/null
+++ b/tests/messages_tests/test_api.py
@@ -0,0 +1,54 @@
+from django.contrib import messages
+from django.test import RequestFactory, TestCase
+
+
+class DummyStorage(object):
+ """
+ dummy message-store to test the api methods
+ """
+
+ def __init__(self):
+ self.store = []
+
+ def add(self, level, message, extra_tags=''):
+ self.store.append(message)
+
+
+class ApiTest(TestCase):
+ def setUp(self):
+ self.rf = RequestFactory()
+ self.request = self.rf.request()
+ self.storage = DummyStorage()
+
+ def test_ok(self):
+ msg = 'some message'
+
+ self.request._messages = self.storage
+ messages.add_message(self.request, messages.DEBUG, msg)
+ self.assertIn(msg, self.storage.store)
+
+ def test_request_is_none(self):
+ msg = 'some message'
+
+ self.request._messages = self.storage
+
+ with self.assertRaises(TypeError):
+ messages.add_message(None, messages.DEBUG, msg)
+
+ self.assertEqual([], self.storage.store)
+
+ def test_middleware_missing(self):
+ msg = 'some message'
+
+ with self.assertRaises(messages.MessageFailure):
+ messages.add_message(self.request, messages.DEBUG, msg)
+
+ self.assertEqual([], self.storage.store)
+
+ def test_middleware_missing_silently(self):
+ msg = 'some message'
+
+ messages.add_message(self.request, messages.DEBUG, msg,
+ fail_silently=True)
+
+ self.assertEqual([], self.storage.store)
diff --git a/tests/messages_tests/test_cookie.py b/tests/messages_tests/test_cookie.py
new file mode 100644
index 0000000000..1ce04bd691
--- /dev/null
+++ b/tests/messages_tests/test_cookie.py
@@ -0,0 +1,179 @@
+import json
+
+from django.contrib.messages import constants
+from django.contrib.messages.storage.base import Message
+from django.contrib.messages.storage.cookie import (
+ CookieStorage, MessageDecoder, MessageEncoder,
+)
+from django.test import TestCase, override_settings
+from django.utils.safestring import SafeData, mark_safe
+
+from .base import BaseTests
+
+
+def set_cookie_data(storage, messages, invalid=False, encode_empty=False):
+ """
+ Sets ``request.COOKIES`` with the encoded data and removes the storage
+ backend's loaded data cache.
+ """
+ encoded_data = storage._encode(messages, encode_empty=encode_empty)
+ if invalid:
+ # Truncate the first character so that the hash is invalid.
+ encoded_data = encoded_data[1:]
+ storage.request.COOKIES = {CookieStorage.cookie_name: encoded_data}
+ if hasattr(storage, '_loaded_data'):
+ del storage._loaded_data
+
+
+def stored_cookie_messages_count(storage, response):
+ """
+ Returns an integer containing the number of messages stored.
+ """
+ # Get a list of cookies, excluding ones with a max-age of 0 (because
+ # they have been marked for deletion).
+ cookie = response.cookies.get(storage.cookie_name)
+ if not cookie or cookie['max-age'] == 0:
+ return 0
+ data = storage._decode(cookie.value)
+ if not data:
+ return 0
+ if data[-1] == CookieStorage.not_finished:
+ data.pop()
+ return len(data)
+
+
+@override_settings(SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True)
+class CookieTest(BaseTests, TestCase):
+ storage_class = CookieStorage
+
+ def stored_messages_count(self, storage, response):
+ return stored_cookie_messages_count(storage, response)
+
+ def test_get(self):
+ storage = self.storage_class(self.get_request())
+ # Set initial data.
+ example_messages = ['test', 'me']
+ set_cookie_data(storage, example_messages)
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), example_messages)
+
+ def test_cookie_setings(self):
+ """
+ Ensure that CookieStorage honors SESSION_COOKIE_DOMAIN, SESSION_COOKIE_SECURE and SESSION_COOKIE_HTTPONLY
+ Refs #15618 and #20972.
+ """
+ # Test before the messages have been consumed
+ storage = self.get_storage()
+ response = self.get_response()
+ storage.add(constants.INFO, 'test')
+ storage.update(response)
+ self.assertIn('test', response.cookies['messages'].value)
+ self.assertEqual(response.cookies['messages']['domain'], '.example.com')
+ self.assertEqual(response.cookies['messages']['expires'], '')
+ self.assertEqual(response.cookies['messages']['secure'], True)
+ self.assertEqual(response.cookies['messages']['httponly'], True)
+
+ # Test deletion of the cookie (storing with an empty value) after the messages have been consumed
+ storage = self.get_storage()
+ response = self.get_response()
+ storage.add(constants.INFO, 'test')
+ for m in storage:
+ pass # Iterate through the storage to simulate consumption of messages.
+ storage.update(response)
+ self.assertEqual(response.cookies['messages'].value, '')
+ self.assertEqual(response.cookies['messages']['domain'], '.example.com')
+ self.assertEqual(response.cookies['messages']['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT')
+
+ def test_get_bad_cookie(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ # Set initial (invalid) data.
+ example_messages = ['test', 'me']
+ set_cookie_data(storage, example_messages, invalid=True)
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), [])
+
+ def test_max_cookie_length(self):
+ """
+ Tests that, if the data exceeds what is allowed in a cookie, older
+ messages are removed before saving (and returned by the ``update``
+ method).
+ """
+ storage = self.get_storage()
+ response = self.get_response()
+
+ # When storing as a cookie, the cookie has constant overhead of approx
+ # 54 chars, and each message has a constant overhead of about 37 chars
+ # and a variable overhead of zero in the best case. We aim for a message
+ # size which will fit 4 messages into the cookie, but not 5.
+ # See also FallbackTest.test_session_fallback
+ msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
+ for i in range(5):
+ storage.add(constants.INFO, str(i) * msg_size)
+ unstored_messages = storage.update(response)
+
+ cookie_storing = self.stored_messages_count(storage, response)
+ self.assertEqual(cookie_storing, 4)
+
+ self.assertEqual(len(unstored_messages), 1)
+ self.assertEqual(unstored_messages[0].message, '0' * msg_size)
+
+ def test_json_encoder_decoder(self):
+ """
+ Tests that a complex nested data structure containing Message
+ instances is properly encoded/decoded by the custom JSON
+ encoder/decoder classes.
+ """
+ messages = [
+ {
+ 'message': Message(constants.INFO, 'Test message'),
+ 'message_list': [Message(constants.INFO, 'message %s')
+ for x in range(5)] + [{'another-message':
+ Message(constants.ERROR, 'error')}],
+ },
+ Message(constants.INFO, 'message %s'),
+ ]
+ encoder = MessageEncoder(separators=(',', ':'))
+ value = encoder.encode(messages)
+ decoded_messages = json.loads(value, cls=MessageDecoder)
+ self.assertEqual(messages, decoded_messages)
+
+ def test_safedata(self):
+ """
+ Tests that a message containing SafeData is keeping its safe status when
+ retrieved from the message storage.
+ """
+ def encode_decode(data):
+ message = Message(constants.DEBUG, data)
+ encoded = storage._encode(message)
+ decoded = storage._decode(encoded)
+ return decoded.message
+
+ storage = self.get_storage()
+
+ self.assertIsInstance(
+ encode_decode(mark_safe("<b>Hello Django!</b>")), SafeData)
+ self.assertNotIsInstance(
+ encode_decode("<b>Hello Django!</b>"), SafeData)
+
+ def test_pre_1_5_message_format(self):
+ """
+ For ticket #22426. Tests whether messages that were set in the cookie
+ before the addition of is_safedata are decoded correctly.
+ """
+
+ # Encode the messages using the current encoder.
+ messages = [Message(constants.INFO, 'message %s') for x in range(5)]
+ encoder = MessageEncoder(separators=(',', ':'))
+ encoded_messages = encoder.encode(messages)
+
+ # Remove the is_safedata flag from the messages in order to imitate
+ # the behavior of before 1.5 (monkey patching).
+ encoded_messages = json.loads(encoded_messages)
+ for obj in encoded_messages:
+ obj.pop(1)
+ encoded_messages = json.dumps(encoded_messages, separators=(',', ':'))
+
+ # Decode the messages in the old format (without is_safedata)
+ decoded_messages = json.loads(encoded_messages, cls=MessageDecoder)
+ self.assertEqual(messages, decoded_messages)
diff --git a/tests/messages_tests/test_fallback.py b/tests/messages_tests/test_fallback.py
new file mode 100644
index 0000000000..a57acc728a
--- /dev/null
+++ b/tests/messages_tests/test_fallback.py
@@ -0,0 +1,176 @@
+from django.contrib.messages import constants
+from django.contrib.messages.storage.fallback import (
+ CookieStorage, FallbackStorage,
+)
+from django.test import TestCase
+
+from .base import BaseTests
+from .test_cookie import set_cookie_data, stored_cookie_messages_count
+from .test_session import set_session_data, stored_session_messages_count
+
+
+class FallbackTest(BaseTests, TestCase):
+ storage_class = FallbackStorage
+
+ def get_request(self):
+ self.session = {}
+ request = super(FallbackTest, self).get_request()
+ request.session = self.session
+ return request
+
+ def get_cookie_storage(self, storage):
+ return storage.storages[-2]
+
+ def get_session_storage(self, storage):
+ return storage.storages[-1]
+
+ def stored_cookie_messages_count(self, storage, response):
+ return stored_cookie_messages_count(self.get_cookie_storage(storage),
+ response)
+
+ def stored_session_messages_count(self, storage, response):
+ return stored_session_messages_count(self.get_session_storage(storage))
+
+ def stored_messages_count(self, storage, response):
+ """
+ Return the storage totals from both cookie and session backends.
+ """
+ total = (self.stored_cookie_messages_count(storage, response) +
+ self.stored_session_messages_count(storage, response))
+ return total
+
+ def test_get(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ cookie_storage = self.get_cookie_storage(storage)
+
+ # Set initial cookie data.
+ example_messages = [str(i) for i in range(5)]
+ set_cookie_data(cookie_storage, example_messages)
+
+ # Overwrite the _get method of the fallback storage to prove it is not
+ # used (it would cause a TypeError: 'NoneType' object is not callable).
+ self.get_session_storage(storage)._get = None
+
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), example_messages)
+
+ def test_get_empty(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+
+ # Overwrite the _get method of the fallback storage to prove it is not
+ # used (it would cause a TypeError: 'NoneType' object is not callable).
+ self.get_session_storage(storage)._get = None
+
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), [])
+
+ def test_get_fallback(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ cookie_storage = self.get_cookie_storage(storage)
+ session_storage = self.get_session_storage(storage)
+
+ # Set initial cookie and session data.
+ example_messages = [str(i) for i in range(5)]
+ set_cookie_data(cookie_storage, example_messages[:4] +
+ [CookieStorage.not_finished])
+ set_session_data(session_storage, example_messages[4:])
+
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), example_messages)
+
+ def test_get_fallback_only(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ cookie_storage = self.get_cookie_storage(storage)
+ session_storage = self.get_session_storage(storage)
+
+ # Set initial cookie and session data.
+ example_messages = [str(i) for i in range(5)]
+ set_cookie_data(cookie_storage, [CookieStorage.not_finished],
+ encode_empty=True)
+ set_session_data(session_storage, example_messages)
+
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), example_messages)
+
+ def test_flush_used_backends(self):
+ request = self.get_request()
+ storage = self.storage_class(request)
+ cookie_storage = self.get_cookie_storage(storage)
+ session_storage = self.get_session_storage(storage)
+
+ # Set initial cookie and session data.
+ set_cookie_data(cookie_storage, ['cookie', CookieStorage.not_finished])
+ set_session_data(session_storage, ['session'])
+
+ # When updating, previously used but no longer needed backends are
+ # flushed.
+ response = self.get_response()
+ list(storage)
+ storage.update(response)
+ session_storing = self.stored_session_messages_count(storage, response)
+ self.assertEqual(session_storing, 0)
+
+ def test_no_fallback(self):
+ """
+ Confirms that:
+
+ (1) A short number of messages whose data size doesn't exceed what is
+ allowed in a cookie will all be stored in the CookieBackend.
+
+ (2) If the CookieBackend can store all messages, the SessionBackend
+ won't be written to at all.
+ """
+ storage = self.get_storage()
+ response = self.get_response()
+
+ # Overwrite the _store method of the fallback storage to prove it isn't
+ # used (it would cause a TypeError: 'NoneType' object is not callable).
+ self.get_session_storage(storage)._store = None
+
+ for i in range(5):
+ storage.add(constants.INFO, str(i) * 100)
+ storage.update(response)
+
+ cookie_storing = self.stored_cookie_messages_count(storage, response)
+ self.assertEqual(cookie_storing, 5)
+ session_storing = self.stored_session_messages_count(storage, response)
+ self.assertEqual(session_storing, 0)
+
+ def test_session_fallback(self):
+ """
+ Confirms that, if the data exceeds what is allowed in a cookie,
+ messages which did not fit are stored in the SessionBackend.
+ """
+ storage = self.get_storage()
+ response = self.get_response()
+
+ # see comment in CookieText.test_cookie_max_length
+ msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
+ for i in range(5):
+ storage.add(constants.INFO, str(i) * msg_size)
+ storage.update(response)
+
+ cookie_storing = self.stored_cookie_messages_count(storage, response)
+ self.assertEqual(cookie_storing, 4)
+ session_storing = self.stored_session_messages_count(storage, response)
+ self.assertEqual(session_storing, 1)
+
+ def test_session_fallback_only(self):
+ """
+ Confirms that large messages, none of which fit in a cookie, are stored
+ in the SessionBackend (and nothing is stored in the CookieBackend).
+ """
+ storage = self.get_storage()
+ response = self.get_response()
+
+ storage.add(constants.INFO, 'x' * 5000)
+ storage.update(response)
+
+ cookie_storing = self.stored_cookie_messages_count(storage, response)
+ self.assertEqual(cookie_storing, 0)
+ session_storing = self.stored_session_messages_count(storage, response)
+ self.assertEqual(session_storing, 1)
diff --git a/tests/messages_tests/test_middleware.py b/tests/messages_tests/test_middleware.py
new file mode 100644
index 0000000000..855f6b32a7
--- /dev/null
+++ b/tests/messages_tests/test_middleware.py
@@ -0,0 +1,19 @@
+import unittest
+
+from django import http
+from django.contrib.messages.middleware import MessageMiddleware
+
+
+class MiddlewareTest(unittest.TestCase):
+
+ def setUp(self):
+ self.middleware = MessageMiddleware()
+
+ def test_response_without_messages(self):
+ """
+ Makes sure that the response middleware is tolerant of messages not
+ existing on request.
+ """
+ request = http.HttpRequest()
+ response = http.HttpResponse()
+ self.middleware.process_response(request, response)
diff --git a/tests/messages_tests/test_mixins.py b/tests/messages_tests/test_mixins.py
new file mode 100644
index 0000000000..a90a37e98b
--- /dev/null
+++ b/tests/messages_tests/test_mixins.py
@@ -0,0 +1,16 @@
+from django.core.urlresolvers import reverse
+from django.test import TestCase, override_settings
+
+from .urls import ContactFormViewWithMsg
+
+
+@override_settings(ROOT_URLCONF='messages_tests.urls')
+class SuccessMessageMixinTests(TestCase):
+
+ def test_set_messages_success(self):
+ author = {'name': 'John Doe',
+ 'slug': 'success-msg'}
+ add_url = reverse('add_success_msg')
+ req = self.client.post(add_url, author)
+ self.assertIn(ContactFormViewWithMsg.success_message % author,
+ req.cookies['messages'].value)
diff --git a/tests/messages_tests/test_session.py b/tests/messages_tests/test_session.py
new file mode 100644
index 0000000000..3d211f6f8d
--- /dev/null
+++ b/tests/messages_tests/test_session.py
@@ -0,0 +1,54 @@
+from django.contrib.messages import constants
+from django.contrib.messages.storage.base import Message
+from django.contrib.messages.storage.session import SessionStorage
+from django.test import TestCase
+from django.utils.safestring import SafeData, mark_safe
+
+from .base import BaseTests
+
+
+def set_session_data(storage, messages):
+ """
+ Sets the messages into the backend request's session and remove the
+ backend's loaded data cache.
+ """
+ storage.request.session[storage.session_key] = storage.serialize_messages(messages)
+ if hasattr(storage, '_loaded_data'):
+ del storage._loaded_data
+
+
+def stored_session_messages_count(storage):
+ data = storage.deserialize_messages(storage.request.session.get(storage.session_key, []))
+ return len(data)
+
+
+class SessionTest(BaseTests, TestCase):
+ storage_class = SessionStorage
+
+ def get_request(self):
+ self.session = {}
+ request = super(SessionTest, self).get_request()
+ request.session = self.session
+ return request
+
+ def stored_messages_count(self, storage, response):
+ return stored_session_messages_count(storage)
+
+ def test_get(self):
+ storage = self.storage_class(self.get_request())
+ # Set initial data.
+ example_messages = ['test', 'me']
+ set_session_data(storage, example_messages)
+ # Test that the message actually contains what we expect.
+ self.assertEqual(list(storage), example_messages)
+
+ def test_safedata(self):
+ """
+ Tests that a message containing SafeData is keeping its safe status when
+ retrieved from the message storage.
+ """
+ storage = self.get_storage()
+
+ message = Message(constants.DEBUG, mark_safe("<b>Hello Django!</b>"))
+ set_session_data(storage, [message])
+ self.assertIsInstance(list(storage)[0].message, SafeData)
diff --git a/tests/messages_tests/urls.py b/tests/messages_tests/urls.py
new file mode 100644
index 0000000000..d748690044
--- /dev/null
+++ b/tests/messages_tests/urls.py
@@ -0,0 +1,80 @@
+from django import forms
+from django.conf.urls import url
+from django.contrib import messages
+from django.contrib.messages.views import SuccessMessageMixin
+from django.core.urlresolvers import reverse
+from django.http import HttpResponse, HttpResponseRedirect
+from django.template import engines
+from django.template.response import TemplateResponse
+from django.views.decorators.cache import never_cache
+from django.views.generic.edit import FormView
+
+
+TEMPLATE = """{% if messages %}
+<ul class="messages">
+ {% for message in messages %}
+ <li{% if message.tags %} class="{{ message.tags }}"{% endif %}>
+ {{ message }}
+ </li>
+ {% endfor %}
+</ul>
+{% endif %}
+"""
+
+
+@never_cache
+def add(request, message_type):
+ # don't default to False here, because we want to test that it defaults
+ # to False if unspecified
+ fail_silently = request.POST.get('fail_silently', None)
+ for msg in request.POST.getlist('messages'):
+ if fail_silently is not None:
+ getattr(messages, message_type)(request, msg,
+ fail_silently=fail_silently)
+ else:
+ getattr(messages, message_type)(request, msg)
+
+ show_url = reverse('show_message')
+ return HttpResponseRedirect(show_url)
+
+
+@never_cache
+def add_template_response(request, message_type):
+ for msg in request.POST.getlist('messages'):
+ getattr(messages, message_type)(request, msg)
+
+ show_url = reverse('show_template_response')
+ return HttpResponseRedirect(show_url)
+
+
+@never_cache
+def show(request):
+ template = engines['django'].from_string(TEMPLATE)
+ return HttpResponse(template.render(request=request))
+
+
+@never_cache
+def show_template_response(request):
+ template = engines['django'].from_string(TEMPLATE)
+ return TemplateResponse(request, template)
+
+
+class ContactForm(forms.Form):
+ name = forms.CharField(required=True)
+ slug = forms.SlugField(required=True)
+
+
+class ContactFormViewWithMsg(SuccessMessageMixin, FormView):
+ form_class = ContactForm
+ success_url = show
+ success_message = "%(name)s was created successfully"
+
+
+urlpatterns = [
+ url('^add/(debug|info|success|warning|error)/$', add, name='add_message'),
+ url('^add/msg/$', ContactFormViewWithMsg.as_view(), name='add_success_msg'),
+ url('^show/$', show, name='show_message'),
+ url('^template_response/add/(debug|info|success|warning|error)/$',
+ add_template_response, name='add_template_response'),
+ url('^template_response/show/$', show_template_response, name='show_template_response'),
+]