diff options
Diffstat (limited to 'tests/mail/tests.py')
-rw-r--r-- | tests/mail/tests.py | 740 |
1 files changed, 740 insertions, 0 deletions
diff --git a/tests/mail/tests.py b/tests/mail/tests.py new file mode 100644 index 0000000000..059dd6d09a --- /dev/null +++ b/tests/mail/tests.py @@ -0,0 +1,740 @@ +# coding: utf-8 +from __future__ import unicode_literals + +import asyncore +import email +import os +import shutil +import smtpd +import sys +import tempfile +import threading + +from django.core import mail +from django.core.mail import (EmailMessage, mail_admins, mail_managers, + EmailMultiAlternatives, send_mail, send_mass_mail) +from django.core.mail.backends import console, dummy, locmem, filebased, smtp +from django.core.mail.message import BadHeaderError +from django.test import TestCase +from django.test.utils import override_settings +from django.utils.encoding import force_str, force_text +from django.utils.six import PY3, StringIO +from django.utils.translation import ugettext_lazy + + +class MailTests(TestCase): + """ + Non-backend specific tests. + """ + + def test_ascii(self): + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) + message = email.message() + self.assertEqual(message['Subject'], 'Subject') + self.assertEqual(message.get_payload(), 'Content') + self.assertEqual(message['From'], 'from@example.com') + self.assertEqual(message['To'], 'to@example.com') + + def test_multiple_recipients(self): + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com']) + message = email.message() + self.assertEqual(message['Subject'], 'Subject') + self.assertEqual(message.get_payload(), 'Content') + self.assertEqual(message['From'], 'from@example.com') + self.assertEqual(message['To'], 'to@example.com, other@example.com') + + def test_cc(self): + """Regression test for #7722""" + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) + message = email.message() + self.assertEqual(message['Cc'], 'cc@example.com') + self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com']) + + # Test multiple CC with multiple To + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com']) + message = email.message() + self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com') + self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com']) + + # Testing with Bcc + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'], bcc=['bcc@example.com']) + message = email.message() + self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com') + self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com', 'bcc@example.com']) + + def test_recipients_as_tuple(self): + email = EmailMessage('Subject', 'Content', 'from@example.com', ('to@example.com', 'other@example.com'), cc=('cc@example.com', 'cc.other@example.com'), bcc=('bcc@example.com',)) + message = email.message() + self.assertEqual(message['Cc'], 'cc@example.com, cc.other@example.com') + self.assertEqual(email.recipients(), ['to@example.com', 'other@example.com', 'cc@example.com', 'cc.other@example.com', 'bcc@example.com']) + + def test_header_injection(self): + email = EmailMessage('Subject\nInjection Test', 'Content', 'from@example.com', ['to@example.com']) + self.assertRaises(BadHeaderError, email.message) + email = EmailMessage(ugettext_lazy('Subject\nInjection Test'), 'Content', 'from@example.com', ['to@example.com']) + self.assertRaises(BadHeaderError, email.message) + + def test_space_continuation(self): + """ + Test for space continuation character in long (ascii) subject headers (#7747) + """ + email = EmailMessage('Long subject lines that get wrapped should contain a space continuation character to get expected behavior in Outlook and Thunderbird', 'Content', 'from@example.com', ['to@example.com']) + message = email.message() + # Note that in Python 3, maximum line length has increased from 76 to 78 + self.assertEqual(message['Subject'].encode(), b'Long subject lines that get wrapped should contain a space continuation\n character to get expected behavior in Outlook and Thunderbird') + + def test_message_header_overrides(self): + """ + Specifying dates or message-ids in the extra headers overrides the + default values (#9233) + """ + headers = {"date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers) + + self.assertEqual(sorted(email.message().items()), [ + ('Content-Transfer-Encoding', '7bit'), + ('Content-Type', 'text/plain; charset="utf-8"'), + ('From', 'from@example.com'), + ('MIME-Version', '1.0'), + ('Message-ID', 'foo'), + ('Subject', 'subject'), + ('To', 'to@example.com'), + ('date', 'Fri, 09 Nov 2001 01:08:47 -0000'), + ]) + + def test_from_header(self): + """ + Make sure we can manually set the From header (#9214) + """ + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + message = email.message() + self.assertEqual(message['From'], 'from@example.com') + + def test_to_header(self): + """ + Make sure we can manually set the To header (#17444) + """ + email = EmailMessage('Subject', 'Content', 'bounce@example.com', + ['list-subscriber@example.com', 'list-subscriber2@example.com'], + headers={'To': 'mailing-list@example.com'}) + message = email.message() + self.assertEqual(message['To'], 'mailing-list@example.com') + self.assertEqual(email.to, ['list-subscriber@example.com', 'list-subscriber2@example.com']) + + # If we don't set the To header manually, it should default to the `to` argument to the constructor + email = EmailMessage('Subject', 'Content', 'bounce@example.com', + ['list-subscriber@example.com', 'list-subscriber2@example.com']) + message = email.message() + self.assertEqual(message['To'], 'list-subscriber@example.com, list-subscriber2@example.com') + self.assertEqual(email.to, ['list-subscriber@example.com', 'list-subscriber2@example.com']) + + def test_multiple_message_call(self): + """ + Regression for #13259 - Make sure that headers are not changed when + calling EmailMessage.message() + """ + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + message = email.message() + self.assertEqual(message['From'], 'from@example.com') + message = email.message() + self.assertEqual(message['From'], 'from@example.com') + + def test_unicode_address_header(self): + """ + Regression for #11144 - When a to/from/cc header contains unicode, + make sure the email addresses are parsed correctly (especially with + regards to commas) + """ + email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com']) + self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com') + email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com']) + self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com') + + def test_unicode_headers(self): + email = EmailMessage("Gżegżółka", "Content", "from@example.com", ["to@example.com"], + headers={"Sender": '"Firstname Sürname" <sender@example.com>', + "Comments": 'My Sürname is non-ASCII'}) + message = email.message() + self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=') + self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>') + self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=') + + def test_safe_mime_multipart(self): + """ + Make sure headers can be set with a different encoding than utf-8 in + SafeMIMEMultipart as well + """ + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + subject, from_email, to = 'hello', 'from@example.com', '"Sürname, Firstname" <to@example.com>' + text_content = 'This is an important message.' + html_content = '<p>This is an <strong>important</strong> message.</p>' + msg = EmailMultiAlternatives('Message from Firstname Sürname', text_content, from_email, [to], headers=headers) + msg.attach_alternative(html_content, "text/html") + msg.encoding = 'iso-8859-1' + self.assertEqual(msg.message()['To'], '=?iso-8859-1?q?S=FCrname=2C_Firstname?= <to@example.com>') + self.assertEqual(msg.message()['Subject'], '=?iso-8859-1?q?Message_from_Firstname_S=FCrname?=') + + def test_encoding(self): + """ + Regression for #12791 - Encode body correctly with other encodings + than utf-8 + """ + email = EmailMessage('Subject', 'Firstname Sürname is a great guy.', 'from@example.com', ['other@example.com']) + email.encoding = 'iso-8859-1' + message = email.message() + self.assertTrue(message.as_string().startswith('Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: other@example.com')) + self.assertEqual(message.get_payload(), 'Firstname S=FCrname is a great guy.') + + # Make sure MIME attachments also works correctly with other encodings than utf-8 + text_content = 'Firstname Sürname is a great guy.' + html_content = '<p>Firstname Sürname is a <strong>great</strong> guy.</p>' + msg = EmailMultiAlternatives('Subject', text_content, 'from@example.com', ['to@example.com']) + msg.encoding = 'iso-8859-1' + msg.attach_alternative(html_content, "text/html") + self.assertEqual(msg.message().get_payload(0).as_string(), 'Content-Type: text/plain; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\nFirstname S=FCrname is a great guy.') + self.assertEqual(msg.message().get_payload(1).as_string(), 'Content-Type: text/html; charset="iso-8859-1"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\n\n<p>Firstname S=FCrname is a <strong>great</strong> guy.</p>') + + def test_attachments(self): + """Regression test for #9367""" + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + subject, from_email, to = 'hello', 'from@example.com', 'to@example.com' + text_content = 'This is an important message.' + html_content = '<p>This is an <strong>important</strong> message.</p>' + msg = EmailMultiAlternatives(subject, text_content, from_email, [to], headers=headers) + msg.attach_alternative(html_content, "text/html") + msg.attach("an attachment.pdf", b"%PDF-1.4.%...", mimetype="application/pdf") + msg_str = msg.message().as_string() + message = email.message_from_string(msg_str) + self.assertTrue(message.is_multipart()) + self.assertEqual(message.get_content_type(), 'multipart/mixed') + self.assertEqual(message.get_default_type(), 'text/plain') + payload = message.get_payload() + self.assertEqual(payload[0].get_content_type(), 'multipart/alternative') + self.assertEqual(payload[1].get_content_type(), 'application/pdf') + + def test_non_ascii_attachment_filename(self): + """Regression test for #14964""" + headers = {"Date": "Fri, 09 Nov 2001 01:08:47 -0000", "Message-ID": "foo"} + subject, from_email, to = 'hello', 'from@example.com', 'to@example.com' + content = 'This is the message.' + msg = EmailMessage(subject, content, from_email, [to], headers=headers) + # Unicode in file name + msg.attach("une pièce jointe.pdf", b"%PDF-1.4.%...", mimetype="application/pdf") + msg_str = msg.message().as_string() + message = email.message_from_string(msg_str) + payload = message.get_payload() + self.assertEqual(payload[1].get_filename(), 'une pièce jointe.pdf') + + def test_dummy_backend(self): + """ + Make sure that dummy backends returns correct number of sent messages + """ + connection = dummy.EmailBackend() + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + self.assertEqual(connection.send_messages([email, email, email]), 3) + + def test_arbitrary_keyword(self): + """ + Make sure that get_connection() accepts arbitrary keyword that might be + used with custom backends. + """ + c = mail.get_connection(fail_silently=True, foo='bar') + self.assertTrue(c.fail_silently) + + def test_custom_backend(self): + """Test custom backend defined in this suite.""" + conn = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + self.assertTrue(hasattr(conn, 'test_outbox')) + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + conn.send_messages([email]) + self.assertEqual(len(conn.test_outbox), 1) + + def test_backend_arg(self): + """Test backend argument of mail.get_connection()""" + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.smtp.EmailBackend'), smtp.EmailBackend)) + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.locmem.EmailBackend'), locmem.EmailBackend)) + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend)) + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend)) + tmp_dir = tempfile.mkdtemp() + try: + self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend)) + finally: + shutil.rmtree(tmp_dir) + self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend)) + + @override_settings( + EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend', + ADMINS=[('nobody', 'nobody@example.com')], + MANAGERS=[('nobody', 'nobody@example.com')]) + def test_connection_arg(self): + """Test connection argument to send_mail(), et. al.""" + mail.outbox = [] + + # Send using non-default connection + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, 'Subject') + + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + send_mass_mail([ + ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']), + ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']), + ], connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 2) + self.assertEqual(connection.test_outbox[0].subject, 'Subject1') + self.assertEqual(connection.test_outbox[1].subject, 'Subject2') + + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + mail_admins('Admin message', 'Content', connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message') + + connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend') + mail_managers('Manager message', 'Content', connection=connection) + self.assertEqual(mail.outbox, []) + self.assertEqual(len(connection.test_outbox), 1) + self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message') + + def test_dont_mangle_from_in_body(self): + # Regression for #13433 - Make sure that EmailMessage doesn't mangle + # 'From ' in message body. + email = EmailMessage('Subject', 'From the future', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + self.assertFalse('>From the future' in email.message().as_string()) + + def test_dont_base64_encode(self): + # Ticket #3472 + # Shouldn't use Base64 encoding at all + msg = EmailMessage('Subject', 'UTF-8 encoded body', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + self.assertFalse('Content-Transfer-Encoding: base64' in msg.message().as_string()) + + # Ticket #11212 + # Shouldn't use quoted printable, should detect it can represent content with 7 bit data + msg = EmailMessage('Subject', 'Body with only ASCII characters.', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + s = msg.message().as_string() + self.assertFalse('Content-Transfer-Encoding: quoted-printable' in s) + self.assertTrue('Content-Transfer-Encoding: 7bit' in s) + + # Shouldn't use quoted printable, should detect it can represent content with 8 bit data + msg = EmailMessage('Subject', 'Body with latin characters: àáä.', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + s = msg.message().as_string() + self.assertFalse(str('Content-Transfer-Encoding: quoted-printable') in s) + self.assertTrue(str('Content-Transfer-Encoding: 8bit') in s) + + msg = EmailMessage('Subject', 'Body with non latin characters: А Б В Г Д Е Ж Ѕ З И І К Л М Н О П.', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + s = msg.message().as_string() + self.assertFalse(str('Content-Transfer-Encoding: quoted-printable') in s) + self.assertTrue(str('Content-Transfer-Encoding: 8bit') in s) + + +class BaseEmailBackendTests(object): + email_backend = None + + def setUp(self): + self.settings_override = override_settings(EMAIL_BACKEND=self.email_backend) + self.settings_override.enable() + + def tearDown(self): + self.settings_override.disable() + + def assertStartsWith(self, first, second): + if not first.startswith(second): + self.longMessage = True + self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.") + + def get_mailbox_content(self): + raise NotImplementedError + + def flush_mailbox(self): + raise NotImplementedError + + def get_the_message(self): + mailbox = self.get_mailbox_content() + self.assertEqual(len(mailbox), 1, + "Expected exactly one message, got %d.\n%r" % (len(mailbox), [ + m.as_string() for m in mailbox])) + return mailbox[0] + + def test_send(self): + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com']) + num_sent = mail.get_connection().send_messages([email]) + self.assertEqual(num_sent, 1) + message = self.get_the_message() + self.assertEqual(message["subject"], "Subject") + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message["from"], "from@example.com") + self.assertEqual(message.get_all("to"), ["to@example.com"]) + + def test_send_unicode(self): + email = EmailMessage('Chère maman', 'Je t\'aime très fort', 'from@example.com', ['to@example.com']) + num_sent = mail.get_connection().send_messages([email]) + self.assertEqual(num_sent, 1) + message = self.get_the_message() + self.assertEqual(message["subject"], '=?utf-8?q?Ch=C3=A8re_maman?=') + self.assertEqual(force_text(message.get_payload()), 'Je t\'aime très fort') + + def test_send_many(self): + email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com']) + email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com']) + num_sent = mail.get_connection().send_messages([email1, email2]) + self.assertEqual(num_sent, 2) + messages = self.get_mailbox_content() + self.assertEqual(len(messages), 2) + self.assertEqual(messages[0].get_payload(), "Content1") + self.assertEqual(messages[1].get_payload(), "Content2") + + def test_send_verbose_name(self): + email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>', + ["to@example.com"]) + email.send() + message = self.get_the_message() + self.assertEqual(message["subject"], "Subject") + self.assertEqual(message.get_payload(), "Content") + self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>") + + @override_settings(MANAGERS=[('nobody', 'nobody@example.com')]) + def test_html_mail_managers(self): + """Test html_message argument to mail_managers""" + mail_managers('Subject', 'Content', html_message='HTML Content') + message = self.get_the_message() + + self.assertEqual(message.get('subject'), '[Django] Subject') + self.assertEqual(message.get_all('to'), ['nobody@example.com']) + self.assertTrue(message.is_multipart()) + self.assertEqual(len(message.get_payload()), 2) + self.assertEqual(message.get_payload(0).get_payload(), 'Content') + self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') + self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') + self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') + + @override_settings(ADMINS=[('nobody', 'nobody@example.com')]) + def test_html_mail_admins(self): + """Test html_message argument to mail_admins """ + mail_admins('Subject', 'Content', html_message='HTML Content') + message = self.get_the_message() + + self.assertEqual(message.get('subject'), '[Django] Subject') + self.assertEqual(message.get_all('to'), ['nobody@example.com']) + self.assertTrue(message.is_multipart()) + self.assertEqual(len(message.get_payload()), 2) + self.assertEqual(message.get_payload(0).get_payload(), 'Content') + self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain') + self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content') + self.assertEqual(message.get_payload(1).get_content_type(), 'text/html') + + @override_settings( + ADMINS=[('nobody', 'nobody+admin@example.com')], + MANAGERS=[('nobody', 'nobody+manager@example.com')]) + def test_manager_and_admin_mail_prefix(self): + """ + String prefix + lazy translated subject = bad output + Regression for #13494 + """ + mail_managers(ugettext_lazy('Subject'), 'Content') + message = self.get_the_message() + self.assertEqual(message.get('subject'), '[Django] Subject') + + self.flush_mailbox() + mail_admins(ugettext_lazy('Subject'), 'Content') + message = self.get_the_message() + self.assertEqual(message.get('subject'), '[Django] Subject') + + @override_settings(ADMINS=(), MANAGERS=()) + def test_empty_admins(self): + """ + Test that mail_admins/mail_managers doesn't connect to the mail server + if there are no recipients (#9383) + """ + mail_admins('hi', 'there') + self.assertEqual(self.get_mailbox_content(), []) + mail_managers('hi', 'there') + self.assertEqual(self.get_mailbox_content(), []) + + def test_message_cc_header(self): + """ + Regression test for #7722 + """ + email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com']) + mail.get_connection().send_messages([email]) + message = self.get_the_message() + self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ') + + def test_idn_send(self): + """ + Regression test for #14301 + """ + self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', ['to@öäü.com'])) + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') + self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') + + self.flush_mailbox() + m = EmailMessage('Subject', 'Content', 'from@öäü.com', + ['to@öäü.com'], cc=['cc@öäü.com']) + m.send() + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@xn--4ca9at.com') + self.assertEqual(message.get('to'), 'to@xn--4ca9at.com') + self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com') + + def test_recipient_without_domain(self): + """ + Regression test for #15042 + """ + self.assertTrue(send_mail("Subject", "Content", "tester", ["django"])) + message = self.get_the_message() + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), "tester") + self.assertEqual(message.get('to'), "django") + + def test_close_connection(self): + """ + Test that connection can be closed (even when not explicitely opened) + """ + conn = mail.get_connection(username='', password='') + try: + conn.close() + except Exception as e: + self.fail("close() unexpectedly raised an exception: %s" % e) + + +class LocmemBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.locmem.EmailBackend' + + def get_mailbox_content(self): + return [m.message() for m in mail.outbox] + + def flush_mailbox(self): + mail.outbox = [] + + def tearDown(self): + super(LocmemBackendTests, self).tearDown() + mail.outbox = [] + + def test_locmem_shared_messages(self): + """ + Make sure that the locmen backend populates the outbox. + """ + connection = locmem.EmailBackend() + connection2 = locmem.EmailBackend() + email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + connection.send_messages([email]) + connection2.send_messages([email]) + self.assertEqual(len(mail.outbox), 2) + + def test_validate_multiline_headers(self): + # Ticket #18861 - Validate emails when using the locmem backend + with self.assertRaises(BadHeaderError): + send_mail('Subject\nMultiline', 'Content', 'from@example.com', ['to@example.com']) + + +class FileBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.filebased.EmailBackend' + + def setUp(self): + super(FileBackendTests, self).setUp() + self.tmp_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.tmp_dir) + self._settings_override = override_settings(EMAIL_FILE_PATH=self.tmp_dir) + self._settings_override.enable() + + def tearDown(self): + self._settings_override.disable() + super(FileBackendTests, self).tearDown() + + def flush_mailbox(self): + for filename in os.listdir(self.tmp_dir): + os.unlink(os.path.join(self.tmp_dir, filename)) + + def get_mailbox_content(self): + messages = [] + for filename in os.listdir(self.tmp_dir): + with open(os.path.join(self.tmp_dir, filename), 'r') as fp: + session = force_text(fp.read()).split('\n' + ('-' * 79) + '\n') + messages.extend(email.message_from_string(force_str(m)) for m in session if m) + return messages + + def test_file_sessions(self): + """Make sure opening a connection creates a new file""" + msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'}) + connection = mail.get_connection() + connection.send_messages([msg]) + + self.assertEqual(len(os.listdir(self.tmp_dir)), 1) + with open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0])) as fp: + message = email.message_from_file(fp) + self.assertEqual(message.get_content_type(), 'text/plain') + self.assertEqual(message.get('subject'), 'Subject') + self.assertEqual(message.get('from'), 'from@example.com') + self.assertEqual(message.get('to'), 'to@example.com') + + connection2 = mail.get_connection() + connection2.send_messages([msg]) + self.assertEqual(len(os.listdir(self.tmp_dir)), 2) + + connection.send_messages([msg]) + self.assertEqual(len(os.listdir(self.tmp_dir)), 2) + + msg.connection = mail.get_connection() + self.assertTrue(connection.open()) + msg.send() + self.assertEqual(len(os.listdir(self.tmp_dir)), 3) + msg.send() + self.assertEqual(len(os.listdir(self.tmp_dir)), 3) + + connection.close() + + +class ConsoleBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.console.EmailBackend' + + def setUp(self): + super(ConsoleBackendTests, self).setUp() + self.__stdout = sys.stdout + self.stream = sys.stdout = StringIO() + + def tearDown(self): + del self.stream + sys.stdout = self.__stdout + del self.__stdout + super(ConsoleBackendTests, self).tearDown() + + def flush_mailbox(self): + self.stream = sys.stdout = StringIO() + + def get_mailbox_content(self): + messages = force_text(self.stream.getvalue()).split('\n' + ('-' * 79) + '\n') + return [email.message_from_string(force_str(m)) for m in messages if m] + + def test_console_stream_kwarg(self): + """ + Test that the console backend can be pointed at an arbitrary stream. + """ + s = StringIO() + connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s) + send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection) + self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: 7bit\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: ')) + + +class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): + """ + Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: + http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup + """ + + def __init__(self, *args, **kwargs): + threading.Thread.__init__(self) + smtpd.SMTPServer.__init__(self, *args, **kwargs) + self._sink = [] + self.active = False + self.active_lock = threading.Lock() + self.sink_lock = threading.Lock() + + def process_message(self, peer, mailfrom, rcpttos, data): + m = email.message_from_string(data) + if PY3: + maddr = email.utils.parseaddr(m.get('from'))[1] + else: + maddr = email.Utils.parseaddr(m.get('from'))[1] + if mailfrom != maddr: + return "553 '%s' != '%s'" % (mailfrom, maddr) + with self.sink_lock: + self._sink.append(m) + + def get_sink(self): + with self.sink_lock: + return self._sink[:] + + def flush_sink(self): + with self.sink_lock: + self._sink[:] = [] + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + with self.active_lock: + asyncore.loop(timeout=0.1, count=1) + asyncore.close_all() + + def stop(self): + if self.active: + self.active = False + self.join() + + +class SMTPBackendTests(BaseEmailBackendTests, TestCase): + email_backend = 'django.core.mail.backends.smtp.EmailBackend' + + @classmethod + def setUpClass(cls): + cls.server = FakeSMTPServer(('127.0.0.1', 0), None) + cls._settings_override = override_settings( + EMAIL_HOST="127.0.0.1", + EMAIL_PORT=cls.server.socket.getsockname()[1]) + cls._settings_override.enable() + cls.server.start() + + @classmethod + def tearDownClass(cls): + cls._settings_override.disable() + cls.server.stop() + + def setUp(self): + super(SMTPBackendTests, self).setUp() + self.server.flush_sink() + + def tearDown(self): + self.server.flush_sink() + super(SMTPBackendTests, self).tearDown() + + def flush_mailbox(self): + self.server.flush_sink() + + def get_mailbox_content(self): + return self.server.get_sink() + + @override_settings(EMAIL_HOST_USER="not empty username", + EMAIL_HOST_PASSWORD="not empty password") + def test_email_authentication_use_settings(self): + backend = smtp.EmailBackend() + self.assertEqual(backend.username, 'not empty username') + self.assertEqual(backend.password, 'not empty password') + + @override_settings(EMAIL_HOST_USER="not empty username", + EMAIL_HOST_PASSWORD="not empty password") + def test_email_authentication_override_settings(self): + backend = smtp.EmailBackend(username='username', password='password') + self.assertEqual(backend.username, 'username') + self.assertEqual(backend.password, 'password') + + @override_settings(EMAIL_HOST_USER="not empty username", + EMAIL_HOST_PASSWORD="not empty password") + def test_email_disabled_authentication(self): + backend = smtp.EmailBackend(username='', password='') + self.assertEqual(backend.username, '') + self.assertEqual(backend.password, '') + + def test_server_stopped(self): + """ + Test that closing the backend while the SMTP server is stopped doesn't + raise an exception. + """ + backend = smtp.EmailBackend(username='', password='') + backend.open() + self.server.stop() + try: + backend.close() + except Exception as e: + self.fail("close() unexpectedly raised an exception: %s" % e) |