summaryrefslogtreecommitdiff
path: root/tests/backends/mysql
diff options
context:
space:
mode:
authorMariusz Felisiak <felisiak.mariusz@gmail.com>2017-02-11 21:37:49 +0100
committerTim Graham <timograham@gmail.com>2017-06-21 12:00:47 -0400
commit8cb1b1fd8e529d1896daeb089ea726109e0ba4f7 (patch)
tree3337abead0afac6a46b0ccfd48bc530aec3717de /tests/backends/mysql
parent0f91ba1adc037345474749faa64d36a4077b3fb8 (diff)
downloaddjango-8cb1b1fd8e529d1896daeb089ea726109e0ba4f7.tar.gz
Reorganized backends tests.
Diffstat (limited to 'tests/backends/mysql')
-rw-r--r--tests/backends/mysql/__init__.py0
-rw-r--r--tests/backends/mysql/test_creation.py45
-rw-r--r--tests/backends/mysql/test_features.py19
-rw-r--r--tests/backends/mysql/tests.py91
4 files changed, 155 insertions, 0 deletions
diff --git a/tests/backends/mysql/__init__.py b/tests/backends/mysql/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/mysql/__init__.py
diff --git a/tests/backends/mysql/test_creation.py b/tests/backends/mysql/test_creation.py
new file mode 100644
index 0000000000..e3a83346fe
--- /dev/null
+++ b/tests/backends/mysql/test_creation.py
@@ -0,0 +1,45 @@
+import unittest
+from io import StringIO
+from unittest import mock
+
+from django.db import connection
+from django.db.backends.base.creation import BaseDatabaseCreation
+from django.db.backends.mysql.creation import DatabaseCreation
+from django.db.utils import DatabaseError
+from django.test import SimpleTestCase
+
+
+@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class DatabaseCreationTests(SimpleTestCase):
+
+ def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
+
+ def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1044, "Access denied for user")
+
+ def patch_test_db_creation(self, execute_create_test_db):
+ return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_database_exists(self, *mocked_objects):
+ # Simulate test database creation raising "database exists"
+ creation = DatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_database_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test database.
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+ # "Database exists" shouldn't appear when keepdb is on
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_unexpected_error(self, *mocked_objects):
+ # Simulate test database creation raising unexpected error
+ creation = DatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_access_denied):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
diff --git a/tests/backends/mysql/test_features.py b/tests/backends/mysql/test_features.py
new file mode 100644
index 0000000000..65c897823b
--- /dev/null
+++ b/tests/backends/mysql/test_features.py
@@ -0,0 +1,19 @@
+from unittest import mock, skipUnless
+
+from django.db import connection
+from django.test import TestCase
+
+
+@skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class TestFeatures(TestCase):
+
+ def test_supports_transactions(self):
+ """
+ All storage engines except MyISAM support transactions.
+ """
+ with mock.patch('django.db.connection.features._mysql_storage_engine', 'InnoDB'):
+ self.assertTrue(connection.features.supports_transactions)
+ del connection.features.supports_transactions
+ with mock.patch('django.db.connection.features._mysql_storage_engine', 'MyISAM'):
+ self.assertFalse(connection.features.supports_transactions)
+ del connection.features.supports_transactions
diff --git a/tests/backends/mysql/tests.py b/tests/backends/mysql/tests.py
new file mode 100644
index 0000000000..c9d47eb012
--- /dev/null
+++ b/tests/backends/mysql/tests.py
@@ -0,0 +1,91 @@
+import unittest
+from contextlib import contextmanager
+
+from django.core.exceptions import ImproperlyConfigured
+from django.db import connection
+from django.test import TestCase, override_settings
+
+
+@contextmanager
+def get_connection():
+ new_connection = connection.copy()
+ yield new_connection
+ new_connection.close()
+
+
+@override_settings(DEBUG=True)
+@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class IsolationLevelTests(TestCase):
+
+ read_committed = 'read committed'
+ repeatable_read = 'repeatable read'
+ isolation_values = {
+ level: level.replace(' ', '-').upper()
+ for level in (read_committed, repeatable_read)
+ }
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ configured_isolation_level = connection.isolation_level or cls.isolation_values[cls.repeatable_read]
+ cls.configured_isolation_level = configured_isolation_level.upper()
+ cls.other_isolation_level = (
+ cls.read_committed
+ if configured_isolation_level != cls.isolation_values[cls.read_committed]
+ else cls.repeatable_read
+ )
+
+ @staticmethod
+ def get_isolation_level(connection):
+ with connection.cursor() as cursor:
+ cursor.execute("SELECT @@session.tx_isolation")
+ return cursor.fetchone()[0]
+
+ def test_auto_is_null_auto_config(self):
+ query = 'set sql_auto_is_null = 0'
+ connection.init_connection_state()
+ last_query = connection.queries[-1]['sql'].lower()
+ if connection.features.is_sql_auto_is_null_enabled:
+ self.assertIn(query, last_query)
+ else:
+ self.assertNotIn(query, last_query)
+
+ def test_connect_isolation_level(self):
+ self.assertEqual(self.get_isolation_level(connection), self.configured_isolation_level)
+
+ def test_setting_isolation_level(self):
+ with get_connection() as new_connection:
+ new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level
+ self.assertEqual(
+ self.get_isolation_level(new_connection),
+ self.isolation_values[self.other_isolation_level]
+ )
+
+ def test_uppercase_isolation_level(self):
+ # Upper case values are also accepted in 'isolation_level'.
+ with get_connection() as new_connection:
+ new_connection.settings_dict['OPTIONS']['isolation_level'] = self.other_isolation_level.upper()
+ self.assertEqual(
+ self.get_isolation_level(new_connection),
+ self.isolation_values[self.other_isolation_level]
+ )
+
+ def test_default_isolation_level(self):
+ # If not specified in settings, the default is read committed.
+ with get_connection() as new_connection:
+ new_connection.settings_dict['OPTIONS'].pop('isolation_level', None)
+ self.assertEqual(
+ self.get_isolation_level(new_connection),
+ self.isolation_values[self.read_committed]
+ )
+
+ def test_isolation_level_validation(self):
+ new_connection = connection.copy()
+ new_connection.settings_dict['OPTIONS']['isolation_level'] = 'xxx'
+ msg = (
+ "Invalid transaction isolation level 'xxx' specified.\n"
+ "Use one of 'read committed', 'read uncommitted', "
+ "'repeatable read', 'serializable', or None."
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
+ new_connection.cursor()