diff options
author | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2021-10-15 09:58:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-15 09:58:35 +0200 |
commit | 569a33579c3cca5f801c544d9b52a34e3c779424 (patch) | |
tree | 7c7f6660fee4eb870a04dc48eb81cb7c2a1847a0 /tests/mail | |
parent | e567670b1abe61af4acfaa6a6a7e92a7acfa8b00 (diff) | |
download | django-569a33579c3cca5f801c544d9b52a34e3c779424.tar.gz |
Refs #32074 -- Removed usage of deprecated asyncore and smtpd modules.
asyncore and smtpd modules were deprecated in Python 3.10.
Diffstat (limited to 'tests/mail')
-rw-r--r-- | tests/mail/tests.py | 138 |
1 files changed, 56 insertions, 82 deletions
diff --git a/tests/mail/tests.py b/tests/mail/tests.py index b9b3e452d5..9584b450b3 100644 --- a/tests/mail/tests.py +++ b/tests/mail/tests.py @@ -1,11 +1,9 @@ -import asyncore import mimetypes import os import shutil -import smtpd +import socket import sys import tempfile -import threading from email import charset, message_from_binary_file, message_from_bytes from email.header import Header from email.mime.text import MIMEText @@ -14,7 +12,7 @@ from io import StringIO from pathlib import Path from smtplib import SMTP, SMTPException from ssl import SSLError -from unittest import mock +from unittest import mock, skipUnless from django.core import mail from django.core.mail import ( @@ -27,6 +25,12 @@ from django.test import SimpleTestCase, override_settings from django.test.utils import requires_tz_support from django.utils.translation import gettext_lazy +try: + from aiosmtpd.controller import Controller + HAS_AIOSMTPD = True +except ImportError: + HAS_AIOSMTPD = False + class HeadersCheckMixin: @@ -1336,109 +1340,78 @@ class ConsoleBackendTests(BaseEmailBackendTests, SimpleTestCase): self.assertIn(b'\nDate: ', message) -class FakeSMTPChannel(smtpd.SMTPChannel): - - def collect_incoming_data(self, data): - try: - smtpd.SMTPChannel.collect_incoming_data(self, data) - except UnicodeDecodeError: - # Ignore decode error in SSL/TLS connection tests as the test only - # cares whether the connection attempt was made. - pass +class SMTPHandler: + def __init__(self, *args, **kwargs): + self.mailbox = [] + async def handle_DATA(self, server, session, envelope): + data = envelope.content + mail_from = envelope.mail_from -class FakeSMTPServer(smtpd.SMTPServer, threading.Thread): - """ - Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from: - http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup - """ - channel_class = FakeSMTPChannel - - def __init__(self, *args, **kwargs): - threading.Thread.__init__(self) - smtpd.SMTPServer.__init__(self, *args, decode_data=True, **kwargs) - self._sink = [] - self.active = False - self.active_lock = threading.Lock() - self.sink_lock = threading.Lock() - - def process_message(self, peer, mailfrom, rcpttos, data): - data = data.encode() - m = message_from_bytes(data) - maddr = parseaddr(m.get('from'))[1] - - if mailfrom != maddr: - # According to the spec, mailfrom does not necessarily match the + message = message_from_bytes(data.rstrip()) + message_addr = parseaddr(message.get('from'))[1] + if mail_from != message_addr: + # According to the spec, mail_from does not necessarily match the # From header - this is the case where the local part isn't # encoded, so try to correct that. - lp, domain = mailfrom.split('@', 1) + lp, domain = mail_from.split('@', 1) lp = Header(lp, 'utf-8').encode() - mailfrom = '@'.join([lp, domain]) - - if mailfrom != maddr: - return "553 '%s' != '%s'" % (mailfrom, maddr) - with self.sink_lock: - self._sink.append(m) - - def get_sink(self): - with self.sink_lock: - return self._sink[:] - - def flush_sink(self): - with self.sink_lock: - self._sink[:] = [] + mail_from = '@'.join([lp, domain]) - def start(self): - assert not self.active - self.__flag = threading.Event() - threading.Thread.start(self) - self.__flag.wait() + if mail_from != message_addr: + return f"553 '{mail_from}' != '{message_addr}'" + self.mailbox.append(message) + return '250 OK' - def run(self): - self.active = True - self.__flag.set() - while self.active and asyncore.socket_map: - with self.active_lock: - asyncore.loop(timeout=0.1, count=1) - asyncore.close_all() - - def stop(self): - if self.active: - self.active = False - self.join() + def flush_mailbox(self): + self.mailbox[:] = [] +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendTestsBase(SimpleTestCase): @classmethod def setUpClass(cls): super().setUpClass() - cls.server = FakeSMTPServer(('127.0.0.1', 0), None) + # Find a free port. + with socket.socket() as s: + s.bind(('127.0.0.1', 0)) + port = s.getsockname()[1] + cls.smtp_handler = SMTPHandler() + cls.smtp_controller = Controller( + cls.smtp_handler, hostname='127.0.0.1', port=port, + ) cls._settings_override = override_settings( - EMAIL_HOST="127.0.0.1", - EMAIL_PORT=cls.server.socket.getsockname()[1]) + EMAIL_HOST=cls.smtp_controller.hostname, + EMAIL_PORT=cls.smtp_controller.port, + ) cls._settings_override.enable() cls.addClassCleanup(cls._settings_override.disable) - cls.server.start() - cls.addClassCleanup(cls.server.stop) + cls.smtp_controller.start() + cls.addClassCleanup(cls.stop_smtp) + + @classmethod + def stop_smtp(cls): + cls.smtp_controller.stop() +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase): email_backend = 'django.core.mail.backends.smtp.EmailBackend' def setUp(self): super().setUp() - self.server.flush_sink() + self.smtp_handler.flush_mailbox() def tearDown(self): - self.server.flush_sink() + self.smtp_handler.flush_mailbox() super().tearDown() def flush_mailbox(self): - self.server.flush_sink() + self.smtp_handler.flush_mailbox() def get_mailbox_content(self): - return self.server.get_sink() + return self.smtp_handler.mailbox @override_settings( EMAIL_HOST_USER="not empty username", @@ -1657,17 +1630,18 @@ class SMTPBackendTests(BaseEmailBackendTests, SMTPBackendTestsBase): self.assertEqual(sent, 0) +@skipUnless(HAS_AIOSMTPD, 'No aiosmtpd library detected.') class SMTPBackendStoppedServerTests(SMTPBackendTestsBase): - """ - These tests require a separate class, because the FakeSMTPServer is shut - down in setUpClass(), and it cannot be restarted ("RuntimeError: threads - can only be started once"). - """ @classmethod def setUpClass(cls): super().setUpClass() cls.backend = smtp.EmailBackend(username='', password='') - cls.server.stop() + cls.smtp_controller.stop() + + @classmethod + def stop_smtp(cls): + # SMTP controller is stopped in setUpClass(). + pass def test_server_stopped(self): """ |