summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/backends/base/__init__.py0
-rw-r--r--tests/backends/base/test_base.py32
-rw-r--r--tests/backends/base/test_creation.py42
-rw-r--r--tests/backends/base/test_features.py8
-rw-r--r--tests/backends/mysql/__init__.py0
-rw-r--r--tests/backends/mysql/test_creation.py45
-rw-r--r--tests/backends/mysql/test_features.py (renamed from tests/backends/test_features.py)12
-rw-r--r--tests/backends/mysql/tests.py (renamed from tests/backends/test_mysql.py)4
-rw-r--r--tests/backends/oracle/__init__.py0
-rw-r--r--tests/backends/oracle/test_creation.py76
-rw-r--r--tests/backends/oracle/tests.py55
-rw-r--r--tests/backends/postgresql/__init__.py0
-rw-r--r--tests/backends/postgresql/test_creation.py95
-rw-r--r--tests/backends/postgresql/test_server_side_cursors.py (renamed from tests/backends/test_postgresql.py)4
-rw-r--r--tests/backends/postgresql/tests.py147
-rw-r--r--tests/backends/sqlite/__init__.py0
-rw-r--r--tests/backends/sqlite/tests.py138
-rw-r--r--tests/backends/test_creation.py244
-rw-r--r--tests/backends/test_utils.py59
-rw-r--r--tests/backends/tests.py431
-rw-r--r--tests/db_utils/tests.py49
21 files changed, 731 insertions, 710 deletions
diff --git a/tests/backends/base/__init__.py b/tests/backends/base/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/base/__init__.py
diff --git a/tests/backends/base/test_base.py b/tests/backends/base/test_base.py
new file mode 100644
index 0000000000..15cfbb8579
--- /dev/null
+++ b/tests/backends/base/test_base.py
@@ -0,0 +1,32 @@
+from django.db import DEFAULT_DB_ALIAS, connection, connections
+from django.db.backends.base.base import BaseDatabaseWrapper
+from django.test import SimpleTestCase
+
+
+class DatabaseWrapperTests(SimpleTestCase):
+
+ def test_initialization_class_attributes(self):
+ """
+ The "initialization" class attributes like client_class and
+ creation_class should be set on the class and reflected in the
+ corresponding instance attributes of the instantiated backend.
+ """
+ conn = connections[DEFAULT_DB_ALIAS]
+ conn_class = type(conn)
+ attr_names = [
+ ('client_class', 'client'),
+ ('creation_class', 'creation'),
+ ('features_class', 'features'),
+ ('introspection_class', 'introspection'),
+ ('ops_class', 'ops'),
+ ('validation_class', 'validation'),
+ ]
+ for class_attr_name, instance_attr_name in attr_names:
+ class_attr_value = getattr(conn_class, class_attr_name)
+ self.assertIsNotNone(class_attr_value)
+ instance_attr_value = getattr(conn, instance_attr_name)
+ self.assertIsInstance(instance_attr_value, class_attr_value)
+
+ def test_initialization_display_name(self):
+ self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown')
+ self.assertNotEqual(connection.display_name, 'unknown')
diff --git a/tests/backends/base/test_creation.py b/tests/backends/base/test_creation.py
new file mode 100644
index 0000000000..519b3f049c
--- /dev/null
+++ b/tests/backends/base/test_creation.py
@@ -0,0 +1,42 @@
+import copy
+
+from django.db import DEFAULT_DB_ALIAS, connections
+from django.db.backends.base.creation import (
+ TEST_DATABASE_PREFIX, BaseDatabaseCreation,
+)
+from django.test import SimpleTestCase
+
+
+class TestDbSignatureTests(SimpleTestCase):
+
+ def get_connection_copy(self):
+ # Get a copy of the default connection. (Can't use django.db.connection
+ # because it'll modify the default connection itself.)
+ test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
+ test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict)
+ return test_connection
+
+ def test_default_name(self):
+ # A test db name isn't set.
+ prod_name = 'hodor'
+ test_connection = self.get_connection_copy()
+ test_connection.settings_dict['NAME'] = prod_name
+ test_connection.settings_dict['TEST'] = {'NAME': None}
+ signature = BaseDatabaseCreation(test_connection).test_db_signature()
+ self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name)
+
+ def test_custom_test_name(self):
+ # A regular test db name is set.
+ test_name = 'hodor'
+ test_connection = self.get_connection_copy()
+ test_connection.settings_dict['TEST'] = {'NAME': test_name}
+ signature = BaseDatabaseCreation(test_connection).test_db_signature()
+ self.assertEqual(signature[3], test_name)
+
+ def test_custom_test_name_with_test_prefix(self):
+ # A test db name prefixed with TEST_DATABASE_PREFIX is set.
+ test_name = TEST_DATABASE_PREFIX + 'hodor'
+ test_connection = self.get_connection_copy()
+ test_connection.settings_dict['TEST'] = {'NAME': test_name}
+ signature = BaseDatabaseCreation(test_connection).test_db_signature()
+ self.assertEqual(signature[3], test_name)
diff --git a/tests/backends/base/test_features.py b/tests/backends/base/test_features.py
new file mode 100644
index 0000000000..831a0002a3
--- /dev/null
+++ b/tests/backends/base/test_features.py
@@ -0,0 +1,8 @@
+from django.db import connection
+from django.test import TestCase
+
+
+class TestDatabaseFeatures(TestCase):
+
+ def test_nonexistent_feature(self):
+ self.assertFalse(hasattr(connection.features, 'nonexistent'))
diff --git a/tests/backends/mysql/__init__.py b/tests/backends/mysql/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/mysql/__init__.py
diff --git a/tests/backends/mysql/test_creation.py b/tests/backends/mysql/test_creation.py
new file mode 100644
index 0000000000..e3a83346fe
--- /dev/null
+++ b/tests/backends/mysql/test_creation.py
@@ -0,0 +1,45 @@
+import unittest
+from io import StringIO
+from unittest import mock
+
+from django.db import connection
+from django.db.backends.base.creation import BaseDatabaseCreation
+from django.db.backends.mysql.creation import DatabaseCreation
+from django.db.utils import DatabaseError
+from django.test import SimpleTestCase
+
+
+@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class DatabaseCreationTests(SimpleTestCase):
+
+ def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
+
+ def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
+ raise DatabaseError(1044, "Access denied for user")
+
+ def patch_test_db_creation(self, execute_create_test_db):
+ return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_database_exists(self, *mocked_objects):
+ # Simulate test database creation raising "database exists"
+ creation = DatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_database_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test database.
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+ # "Database exists" shouldn't appear when keepdb is on
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db_unexpected_error(self, *mocked_objects):
+ # Simulate test database creation raising unexpected error
+ creation = DatabaseCreation(connection)
+ with self.patch_test_db_creation(self._execute_raise_access_denied):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
diff --git a/tests/backends/test_features.py b/tests/backends/mysql/test_features.py
index 7bd57f0423..65c897823b 100644
--- a/tests/backends/test_features.py
+++ b/tests/backends/mysql/test_features.py
@@ -4,16 +4,10 @@ from django.db import connection
from django.test import TestCase
-class TestDatabaseFeatures(TestCase):
+@skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class TestFeatures(TestCase):
- def test_nonexistent_feature(self):
- self.assertFalse(hasattr(connection.features, 'nonexistent'))
-
-
-@skipUnless(connection.vendor == 'mysql', 'MySQL backend tests')
-class TestMySQLFeatures(TestCase):
-
- def test_mysql_supports_transactions(self):
+ def test_supports_transactions(self):
"""
All storage engines except MyISAM support transactions.
"""
diff --git a/tests/backends/test_mysql.py b/tests/backends/mysql/tests.py
index 298ca9265f..c9d47eb012 100644
--- a/tests/backends/test_mysql.py
+++ b/tests/backends/mysql/tests.py
@@ -14,8 +14,8 @@ def get_connection():
@override_settings(DEBUG=True)
-@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL specific test.')
-class MySQLTests(TestCase):
+@unittest.skipUnless(connection.vendor == 'mysql', 'MySQL tests')
+class IsolationLevelTests(TestCase):
read_committed = 'read committed'
repeatable_read = 'repeatable read'
diff --git a/tests/backends/oracle/__init__.py b/tests/backends/oracle/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/oracle/__init__.py
diff --git a/tests/backends/oracle/test_creation.py b/tests/backends/oracle/test_creation.py
new file mode 100644
index 0000000000..1688c4efd2
--- /dev/null
+++ b/tests/backends/oracle/test_creation.py
@@ -0,0 +1,76 @@
+import unittest
+from io import StringIO
+from unittest import mock
+
+from django.db import connection
+from django.db.backends.oracle.creation import DatabaseCreation
+from django.db.utils import DatabaseError
+from django.test import TestCase
+
+
+@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests')
+@mock.patch.object(DatabaseCreation, '_maindb_connection', return_value=connection)
+@mock.patch('sys.stdout', new_callable=StringIO)
+@mock.patch('sys.stderr', new_callable=StringIO)
+class DatabaseCreationTests(TestCase):
+
+ def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
+ # Raise "user already exists" only in test user creation
+ if statements and statements[0].startswith('CREATE USER'):
+ raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name")
+
+ def _execute_raise_tablespace_already_exists(
+ self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
+ ):
+ raise DatabaseError("ORA-01543: tablespace 'string' already exists")
+
+ def _execute_raise_insufficient_privileges(
+ self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
+ ):
+ raise DatabaseError("ORA-01031: insufficient privileges")
+
+ def _test_database_passwd(self):
+ # Mocked to avoid test user password changed
+ return connection.settings_dict['SAVED_PASSWORD']
+
+ def patch_execute_statements(self, execute_statements):
+ return mock.patch.object(DatabaseCreation, '_execute_statements', execute_statements)
+
+ @mock.patch.object(DatabaseCreation, '_test_user_create', return_value=False)
+ def test_create_test_db(self, *mocked_objects):
+ creation = DatabaseCreation(connection)
+ # Simulate test database creation raising "tablespace already exists"
+ with self.patch_execute_statements(self._execute_raise_tablespace_already_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test tablespace.
+ creation._create_test_db(verbosity=0, keepdb=False)
+ # "Tablespace already exists" error is ignored when keepdb is on
+ creation._create_test_db(verbosity=0, keepdb=True)
+ # Simulate test database creation raising unexpected error
+ with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, keepdb=False)
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, keepdb=True)
+
+ @mock.patch.object(DatabaseCreation, '_test_database_create', return_value=False)
+ def test_create_test_user(self, *mocked_objects):
+ creation = DatabaseCreation(connection)
+ with mock.patch.object(DatabaseCreation, '_test_database_passwd', self._test_database_passwd):
+ # Simulate test user creation raising "user already exists"
+ with self.patch_execute_statements(self._execute_raise_user_already_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test user.
+ creation._create_test_db(verbosity=0, keepdb=False)
+ # "User already exists" error is ignored when keepdb is on
+ creation._create_test_db(verbosity=0, keepdb=True)
+ # Simulate test user creation raising unexpected error
+ with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, keepdb=False)
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, keepdb=True)
diff --git a/tests/backends/oracle/tests.py b/tests/backends/oracle/tests.py
new file mode 100644
index 0000000000..d57d91e677
--- /dev/null
+++ b/tests/backends/oracle/tests.py
@@ -0,0 +1,55 @@
+import unittest
+
+from django.db import connection
+
+
+@unittest.skipUnless(connection.vendor == 'oracle', 'Oracle tests')
+class Tests(unittest.TestCase):
+
+ def test_quote_name(self):
+ """'%' chars are escaped for query execution."""
+ name = '"SOME%NAME"'
+ quoted_name = connection.ops.quote_name(name)
+ self.assertEqual(quoted_name % (), name)
+
+ def test_dbms_session(self):
+ """A stored procedure can be called through a cursor wrapper."""
+ with connection.cursor() as cursor:
+ cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!'])
+
+ def test_cursor_var(self):
+ """Cursor variables can be passed as query parameters."""
+ from django.db.backends.oracle.base import Database
+ with connection.cursor() as cursor:
+ var = cursor.var(Database.STRING)
+ cursor.execute("BEGIN %s := 'X'; END; ", [var])
+ self.assertEqual(var.getvalue(), 'X')
+
+ def test_long_string(self):
+ """Text longer than 4000 chars can be saved and read."""
+ with connection.cursor() as cursor:
+ cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
+ long_str = ''.join(str(x) for x in range(4000))
+ cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str])
+ cursor.execute('SELECT text FROM ltext')
+ row = cursor.fetchone()
+ self.assertEqual(long_str, row[0].read())
+ cursor.execute('DROP TABLE ltext')
+
+ def test_client_encoding(self):
+ """Client encoding is set correctly."""
+ connection.ensure_connection()
+ self.assertEqual(connection.connection.encoding, 'UTF-8')
+ self.assertEqual(connection.connection.nencoding, 'UTF-8')
+
+ def test_order_of_nls_parameters(self):
+ """
+ An 'almost right' datetime works with configured NLS parameters
+ (#18465).
+ """
+ with connection.cursor() as cursor:
+ query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
+ # The query succeeds without errors - pre #18465 this
+ # wasn't the case.
+ cursor.execute(query)
+ self.assertEqual(cursor.fetchone()[0], 1)
diff --git a/tests/backends/postgresql/__init__.py b/tests/backends/postgresql/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/postgresql/__init__.py
diff --git a/tests/backends/postgresql/test_creation.py b/tests/backends/postgresql/test_creation.py
new file mode 100644
index 0000000000..9554e97a54
--- /dev/null
+++ b/tests/backends/postgresql/test_creation.py
@@ -0,0 +1,95 @@
+import unittest
+from contextlib import contextmanager
+from io import StringIO
+from unittest import mock
+
+from django.db import connection
+from django.db.backends.base.creation import BaseDatabaseCreation
+from django.db.utils import DatabaseError
+from django.test import SimpleTestCase
+
+try:
+ import psycopg2 # NOQA
+except ImportError:
+ pass
+else:
+ from psycopg2 import errorcodes
+ from django.db.backends.postgresql.creation import DatabaseCreation
+
+
+@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
+class DatabaseCreationTests(SimpleTestCase):
+
+ @contextmanager
+ def changed_test_settings(self, **kwargs):
+ settings = connection.settings_dict['TEST']
+ saved_values = {}
+ for name in kwargs:
+ if name in settings:
+ saved_values[name] = settings[name]
+
+ for name, value in kwargs.items():
+ settings[name] = value
+ try:
+ yield
+ finally:
+ for name, value in kwargs.items():
+ if name in saved_values:
+ settings[name] = saved_values[name]
+ else:
+ del settings[name]
+
+ def check_sql_table_creation_suffix(self, settings, expected):
+ with self.changed_test_settings(**settings):
+ creation = DatabaseCreation(connection)
+ suffix = creation.sql_table_creation_suffix()
+ self.assertEqual(suffix, expected)
+
+ def test_sql_table_creation_suffix_with_none_settings(self):
+ settings = {'CHARSET': None, 'TEMPLATE': None}
+ self.check_sql_table_creation_suffix(settings, "")
+
+ def test_sql_table_creation_suffix_with_encoding(self):
+ settings = {'CHARSET': 'UTF8'}
+ self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
+
+ def test_sql_table_creation_suffix_with_template(self):
+ settings = {'TEMPLATE': 'template0'}
+ self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
+
+ def test_sql_table_creation_suffix_with_encoding_and_template(self):
+ settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'}
+ self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''')
+
+ def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
+ error = DatabaseError('database %s already exists' % parameters['dbname'])
+ error.pgcode = errorcodes.DUPLICATE_DATABASE
+ raise DatabaseError() from error
+
+ def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
+ error = DatabaseError('permission denied to create database')
+ error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
+ raise DatabaseError() from error
+
+ def patch_test_db_creation(self, execute_create_test_db):
+ return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
+
+ @mock.patch('sys.stdout', new_callable=StringIO)
+ @mock.patch('sys.stderr', new_callable=StringIO)
+ def test_create_test_db(self, *mocked_objects):
+ creation = DatabaseCreation(connection)
+ # Simulate test database creation raising "database already exists"
+ with self.patch_test_db_creation(self._execute_raise_database_already_exists):
+ with mock.patch('builtins.input', return_value='no'):
+ with self.assertRaises(SystemExit):
+ # SystemExit is raised if the user answers "no" to the
+ # prompt asking if it's okay to delete the test database.
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+ # "Database already exists" error is ignored when keepdb is on
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
+ # Simulate test database creation raising unexpected error
+ with self.patch_test_db_creation(self._execute_raise_permission_denied):
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+ with self.assertRaises(SystemExit):
+ creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
diff --git a/tests/backends/test_postgresql.py b/tests/backends/postgresql/test_server_side_cursors.py
index f4020f4e5c..8576686211 100644
--- a/tests/backends/test_postgresql.py
+++ b/tests/backends/postgresql/test_server_side_cursors.py
@@ -6,10 +6,10 @@ from contextlib import contextmanager
from django.db import connection
from django.test import TestCase
-from .models import Person
+from ..models import Person
-@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL")
+@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
class ServerSideCursorsPostgres(TestCase):
cursor_fields = 'name, statement, is_holdable, is_binary, is_scrollable, creation_time'
PostgresCursor = namedtuple('PostgresCursor', cursor_fields)
diff --git a/tests/backends/postgresql/tests.py b/tests/backends/postgresql/tests.py
new file mode 100644
index 0000000000..140fbbc444
--- /dev/null
+++ b/tests/backends/postgresql/tests.py
@@ -0,0 +1,147 @@
+import unittest
+import warnings
+from unittest import mock
+
+from django.db import DatabaseError, connection
+from django.test import TestCase
+
+
+@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
+class Tests(TestCase):
+
+ def test_nodb_connection(self):
+ """
+ The _nodb_connection property fallbacks to the default connection
+ database when access to the 'postgres' database is not granted.
+ """
+ def mocked_connect(self):
+ if self.settings_dict['NAME'] is None:
+ raise DatabaseError()
+ return ''
+
+ nodb_conn = connection._nodb_connection
+ self.assertIsNone(nodb_conn.settings_dict['NAME'])
+
+ # Now assume the 'postgres' db isn't available
+ with warnings.catch_warnings(record=True) as w:
+ with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
+ side_effect=mocked_connect, autospec=True):
+ warnings.simplefilter('always', RuntimeWarning)
+ nodb_conn = connection._nodb_connection
+ self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
+ self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
+ # Check a RuntimeWarning has been emitted
+ self.assertEqual(len(w), 1)
+ self.assertEqual(w[0].message.__class__, RuntimeWarning)
+
+ def test_connect_and_rollback(self):
+ """
+ PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
+ transaction is rolled back (#17062).
+ """
+ new_connection = connection.copy()
+ try:
+ # Ensure the database default time zone is different than
+ # the time zone in new_connection.settings_dict. We can
+ # get the default time zone by reset & show.
+ cursor = new_connection.cursor()
+ cursor.execute("RESET TIMEZONE")
+ cursor.execute("SHOW TIMEZONE")
+ db_default_tz = cursor.fetchone()[0]
+ new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
+ new_connection.close()
+
+ # Invalidate timezone name cache, because the setting_changed
+ # handler cannot know about new_connection.
+ del new_connection.timezone_name
+
+ # Fetch a new connection with the new_tz as default
+ # time zone, run a query and rollback.
+ with self.settings(TIME_ZONE=new_tz):
+ new_connection.set_autocommit(False)
+ cursor = new_connection.cursor()
+ new_connection.rollback()
+
+ # Now let's see if the rollback rolled back the SET TIME ZONE.
+ cursor.execute("SHOW TIMEZONE")
+ tz = cursor.fetchone()[0]
+ self.assertEqual(new_tz, tz)
+
+ finally:
+ new_connection.close()
+
+ def test_connect_non_autocommit(self):
+ """
+ The connection wrapper shouldn't believe that autocommit is enabled
+ after setting the time zone when AUTOCOMMIT is False (#21452).
+ """
+ new_connection = connection.copy()
+ new_connection.settings_dict['AUTOCOMMIT'] = False
+
+ try:
+ # Open a database connection.
+ new_connection.cursor()
+ self.assertFalse(new_connection.get_autocommit())
+ finally:
+ new_connection.close()
+
+ def test_connect_isolation_level(self):
+ """
+ The transaction level can be configured with
+ DATABASES ['OPTIONS']['isolation_level'].
+ """
+ import psycopg2
+ from psycopg2.extensions import (
+ ISOLATION_LEVEL_READ_COMMITTED as read_committed,
+ ISOLATION_LEVEL_SERIALIZABLE as serializable,
+ )
+ # Since this is a django.test.TestCase, a transaction is in progress
+ # and the isolation level isn't reported as 0. This test assumes that
+ # PostgreSQL is configured with the default isolation level.
+
+ # Check the level on the psycopg2 connection, not the Django wrapper.
+ default_level = read_committed if psycopg2.__version__ < '2.7' else None
+ self.assertEqual(connection.connection.isolation_level, default_level)
+
+ new_connection = connection.copy()
+ new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
+ try:
+ # Start a transaction so the isolation level isn't reported as 0.
+ new_connection.set_autocommit(False)
+ # Check the level on the psycopg2 connection, not the Django wrapper.
+ self.assertEqual(new_connection.connection.isolation_level, serializable)
+ finally:
+ new_connection.close()
+
+ def _select(self, val):
+ with connection.cursor() as cursor:
+ cursor.execute('SELECT %s', (val,))
+ return cursor.fetchone()[0]
+
+ def test_select_ascii_array(self):
+ a = ['awef']
+ b = self._select(a)
+ self.assertEqual(a[0], b[0])
+
+ def test_select_unicode_array(self):
+ a = ['ᄲawef']
+ b = self._select(a)
+ self.assertEqual(a[0], b[0])
+
+ def test_lookup_cast(self):
+ from django.db.backends.postgresql.operations import DatabaseOperations
+ do = DatabaseOperations(connection=None)
+ lookups = (
+ 'iexact', 'contains', 'icontains', 'startswith', 'istartswith',
+ 'endswith', 'iendswith', 'regex', 'iregex',
+ )
+ for lookup in lookups:
+ with self.subTest(lookup=lookup):
+ self.assertIn('::text', do.lookup_cast(lookup))
+
+ def test_correct_extraction_psycopg2_version(self):
+ from django.db.backends.postgresql.base import psycopg2_version
+ with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
+ self.assertEqual(psycopg2_version(), (4, 2, 1))
+ with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
+ self.assertEqual(psycopg2_version(), (4, 2))
diff --git a/tests/backends/sqlite/__init__.py b/tests/backends/sqlite/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/backends/sqlite/__init__.py
diff --git a/tests/backends/sqlite/tests.py b/tests/backends/sqlite/tests.py
new file mode 100644
index 0000000000..838835ccdd
--- /dev/null
+++ b/tests/backends/sqlite/tests.py
@@ -0,0 +1,138 @@
+import re
+import threading
+import unittest
+
+from django.core.exceptions import ImproperlyConfigured
+from django.db import connection
+from django.db.models import Avg, StdDev, Sum, Variance
+from django.test import (
+ TestCase, TransactionTestCase, override_settings, skipUnlessDBFeature,
+)
+
+from ..models import Item, Object, Square
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+class Tests(TestCase):
+ longMessage = True
+
+ def test_autoincrement(self):
+ """
+ auto_increment fields are created with the AUTOINCREMENT keyword
+ in order to be monotonically increasing (#10164).
+ """
+ with connection.schema_editor(collect_sql=True) as editor:
+ editor.create_model(Square)
+ statements = editor.collected_sql
+ match = re.search('"id" ([^,]+),', statements[0])
+ self.assertIsNotNone(match)
+ self.assertEqual(
+ 'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
+ match.group(1),
+ 'Wrong SQL used to create an auto-increment column on SQLite'
+ )
+
+ def test_aggregation(self):
+ """
+ Raise NotImplementedError when aggregating on date/time fields (#19360).
+ """
+ for aggregate in (Sum, Avg, Variance, StdDev):
+ with self.assertRaises(NotImplementedError):
+ Item.objects.all().aggregate(aggregate('time'))
+ with self.assertRaises(NotImplementedError):
+ Item.objects.all().aggregate(aggregate('date'))
+ with self.assertRaises(NotImplementedError):
+ Item.objects.all().aggregate(aggregate('last_modified'))
+ with self.assertRaises(NotImplementedError):
+ Item.objects.all().aggregate(
+ **{'complex': aggregate('last_modified') + aggregate('last_modified')}
+ )
+
+ def test_memory_db_test_name(self):
+ """A named in-memory db should be allowed where supported."""
+ from django.db.backends.sqlite3.base import DatabaseWrapper
+ settings_dict = {
+ 'TEST': {
+ 'NAME': 'file:memorydb_test?mode=memory&cache=shared',
+ }
+ }
+ wrapper = DatabaseWrapper(settings_dict)
+ creation = wrapper.creation
+ if creation.connection.features.can_share_in_memory_db:
+ expected = creation.connection.settings_dict['TEST']['NAME']
+ self.assertEqual(creation._get_test_db_name(), expected)
+ else:
+ msg = (
+ "Using a shared memory database with `mode=memory` in the "
+ "database name is not supported in your environment, "
+ "use `:memory:` instead."
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
+ creation._get_test_db_name()
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'Test only for SQLite')
+@override_settings(DEBUG=True)
+class LastExecutedQueryTest(TestCase):
+
+ def test_no_interpolation(self):
+ # This shouldn't raise an exception (#17158)
+ query = "SELECT strftime('%Y', 'now');"
+ connection.cursor().execute(query)
+ self.assertEqual(connection.queries[-1]['sql'], query)
+
+ def test_parameter_quoting(self):
+ # The implementation of last_executed_queries isn't optimal. It's
+ # worth testing that parameters are quoted (#14091).
+ query = "SELECT %s"
+ params = ["\"'\\"]
+ connection.cursor().execute(query, params)
+ # Note that the single quote is repeated
+ substituted = "SELECT '\"''\\'"
+ self.assertEqual(connection.queries[-1]['sql'], substituted)
+
+ def test_large_number_of_parameters(self):
+ # If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
+ # greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
+ # can hit the SQLITE_MAX_COLUMN limit (#26063).
+ cursor = connection.cursor()
+ sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
+ params = list(range(2001))
+ # This should not raise an exception.
+ cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+class EscapingChecks(TestCase):
+ """
+ All tests in this test case are also run with settings.DEBUG=True in
+ EscapingChecksDebug test case, to also test CursorDebugWrapper.
+ """
+ def test_parameter_escaping(self):
+ # '%s' escaping support for sqlite3 (#13648).
+ cursor = connection.cursor()
+ cursor.execute("select strftime('%s', date('now'))")
+ response = cursor.fetchall()[0][0]
+ # response should be an non-zero integer
+ self.assertTrue(int(response))
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@override_settings(DEBUG=True)
+class EscapingChecksDebug(EscapingChecks):
+ pass
+
+
+@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
+@skipUnlessDBFeature('can_share_in_memory_db')
+class ThreadSharing(TransactionTestCase):
+ available_apps = ['backends']
+
+ def test_database_sharing_in_threads(self):
+ def create_object():
+ Object.objects.create()
+ create_object()
+ thread = threading.Thread(target=create_object)
+ thread.start()
+ thread.join()
+ self.assertEqual(Object.objects.count(), 2)
diff --git a/tests/backends/test_creation.py b/tests/backends/test_creation.py
deleted file mode 100644
index 7a18a8d19b..0000000000
--- a/tests/backends/test_creation.py
+++ /dev/null
@@ -1,244 +0,0 @@
-import copy
-import unittest
-from contextlib import contextmanager
-from io import StringIO
-from unittest import mock
-
-from django.db import DEFAULT_DB_ALIAS, connection, connections
-from django.db.backends.base.creation import (
- TEST_DATABASE_PREFIX, BaseDatabaseCreation,
-)
-from django.db.backends.mysql.creation import (
- DatabaseCreation as MySQLDatabaseCreation,
-)
-from django.db.backends.oracle.creation import (
- DatabaseCreation as OracleDatabaseCreation,
-)
-from django.db.utils import DatabaseError
-from django.test import SimpleTestCase, TestCase
-
-try:
- import psycopg2 # NOQA
-except ImportError:
- pass
-else:
- from psycopg2 import errorcodes
- from django.db.backends.postgresql.creation import \
- DatabaseCreation as PostgreSQLDatabaseCreation
-
-
-class TestDbSignatureTests(SimpleTestCase):
-
- def get_connection_copy(self):
- # Get a copy of the default connection. (Can't use django.db.connection
- # because it'll modify the default connection itself.)
- test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
- test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict)
- return test_connection
-
- def test_default_name(self):
- # A test db name isn't set.
- prod_name = 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['NAME'] = prod_name
- test_connection.settings_dict['TEST'] = {'NAME': None}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name)
-
- def test_custom_test_name(self):
- # A regular test db name is set.
- test_name = 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['TEST'] = {'NAME': test_name}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], test_name)
-
- def test_custom_test_name_with_test_prefix(self):
- # A test db name prefixed with TEST_DATABASE_PREFIX is set.
- test_name = TEST_DATABASE_PREFIX + 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['TEST'] = {'NAME': test_name}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], test_name)
-
-
-@unittest.skipUnless(connection.vendor == 'postgresql', "PostgreSQL-specific tests")
-class PostgreSQLDatabaseCreationTests(SimpleTestCase):
-
- @contextmanager
- def changed_test_settings(self, **kwargs):
- settings = connection.settings_dict['TEST']
- saved_values = {}
- for name in kwargs:
- if name in settings:
- saved_values[name] = settings[name]
-
- for name, value in kwargs.items():
- settings[name] = value
- try:
- yield
- finally:
- for name, value in kwargs.items():
- if name in saved_values:
- settings[name] = saved_values[name]
- else:
- del settings[name]
-
- def check_sql_table_creation_suffix(self, settings, expected):
- with self.changed_test_settings(**settings):
- creation = PostgreSQLDatabaseCreation(connection)
- suffix = creation.sql_table_creation_suffix()
- self.assertEqual(suffix, expected)
-
- def test_sql_table_creation_suffix_with_none_settings(self):
- settings = {'CHARSET': None, 'TEMPLATE': None}
- self.check_sql_table_creation_suffix(settings, "")
-
- def test_sql_table_creation_suffix_with_encoding(self):
- settings = {'CHARSET': 'UTF8'}
- self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
-
- def test_sql_table_creation_suffix_with_template(self):
- settings = {'TEMPLATE': 'template0'}
- self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
-
- def test_sql_table_creation_suffix_with_encoding_and_template(self):
- settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'}
- self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''')
-
- def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
- error = DatabaseError('database %s already exists' % parameters['dbname'])
- error.pgcode = errorcodes.DUPLICATE_DATABASE
- raise DatabaseError() from error
-
- def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
- error = DatabaseError('permission denied to create database')
- error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
- raise DatabaseError() from error
-
- def patch_test_db_creation(self, execute_create_test_db):
- return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
-
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db(self, *mocked_objects):
- creation = PostgreSQLDatabaseCreation(connection)
- # Simulate test database creation raising "database already exists"
- with self.patch_test_db_creation(self._execute_raise_database_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test database.
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- # "Database already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
- # Simulate test database creation raising unexpected error
- with self.patch_test_db_creation(self._execute_raise_permission_denied):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
-
-
-@unittest.skipUnless(connection.vendor == 'oracle', "Oracle specific tests")
-@mock.patch.object(OracleDatabaseCreation, '_maindb_connection', return_value=connection)
-@mock.patch('sys.stdout', new_callable=StringIO)
-@mock.patch('sys.stderr', new_callable=StringIO)
-class OracleDatabaseCreationTests(TestCase):
-
- def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
- # Raise "user already exists" only in test user creation
- if statements and statements[0].startswith('CREATE USER'):
- raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name")
-
- def _execute_raise_tablespace_already_exists(
- self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
- ):
- raise DatabaseError("ORA-01543: tablespace 'string' already exists")
-
- def _execute_raise_insufficient_privileges(
- self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
- ):
- raise DatabaseError("ORA-01031: insufficient privileges")
-
- def _test_database_passwd(self):
- # Mocked to avoid test user password changed
- return connection.settings_dict['SAVED_PASSWORD']
-
- def patch_execute_statements(self, execute_statements):
- return mock.patch.object(OracleDatabaseCreation, '_execute_statements', execute_statements)
-
- @mock.patch.object(OracleDatabaseCreation, '_test_user_create', return_value=False)
- def test_create_test_db(self, *mocked_objects):
- creation = OracleDatabaseCreation(connection)
- # Simulate test database creation raising "tablespace already exists"
- with self.patch_execute_statements(self._execute_raise_tablespace_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test tablespace.
- creation._create_test_db(verbosity=0, keepdb=False)
- # "Tablespace already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, keepdb=True)
- # Simulate test database creation raising unexpected error
- with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=True)
-
- @mock.patch.object(OracleDatabaseCreation, '_test_database_create', return_value=False)
- def test_create_test_user(self, *mocked_objects):
- creation = OracleDatabaseCreation(connection)
- with mock.patch.object(OracleDatabaseCreation, '_test_database_passwd', self._test_database_passwd):
- # Simulate test user creation raising "user already exists"
- with self.patch_execute_statements(self._execute_raise_user_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test user.
- creation._create_test_db(verbosity=0, keepdb=False)
- # "User already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, keepdb=True)
- # Simulate test user creation raising unexpected error
- with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=True)
-
-
-@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests")
-class MySQLDatabaseCreationTests(SimpleTestCase):
-
- def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
- raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
-
- def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
- raise DatabaseError(1044, "Access denied for user")
-
- def patch_test_db_creation(self, execute_create_test_db):
- return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
-
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db_database_exists(self, *mocked_objects):
- # Simulate test database creation raising "database exists"
- creation = MySQLDatabaseCreation(connection)
- with self.patch_test_db_creation(self._execute_raise_database_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test database.
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- # "Database exists" shouldn't appear when keepdb is on
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
-
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db_unexpected_error(self, *mocked_objects):
- # Simulate test database creation raising unexpected error
- creation = MySQLDatabaseCreation(connection)
- with self.patch_test_db_creation(self._execute_raise_access_denied):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
diff --git a/tests/backends/test_utils.py b/tests/backends/test_utils.py
index 60429c5543..2ef1e4b9f7 100644
--- a/tests/backends/test_utils.py
+++ b/tests/backends/test_utils.py
@@ -1,22 +1,11 @@
-import unittest
+"""Tests for django.db.backends.utils"""
+from decimal import Decimal, Rounded
-from django.core.exceptions import ImproperlyConfigured
-from django.db import connection
-from django.db.backends.utils import truncate_name
-from django.db.utils import ProgrammingError, load_backend
-from django.test import SimpleTestCase, TestCase
+from django.db.backends.utils import format_number, truncate_name
+from django.test import SimpleTestCase
-class TestLoadBackend(SimpleTestCase):
- def test_load_backend_invalid_name(self):
- msg = (
- "'foo' isn't an available database backend.\n"
- "Try using 'django.db.backends.XXX', where XXX is one of:\n"
- " 'mysql', 'oracle', 'postgresql', 'sqlite3'"
- )
- with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm:
- load_backend('foo')
- self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'")
+class TestUtils(SimpleTestCase):
def test_truncate_name(self):
self.assertEqual(truncate_name('some_table', 10), 'some_table')
@@ -28,15 +17,31 @@ class TestLoadBackend(SimpleTestCase):
self.assertEqual(truncate_name('username"."some_long_table', 10), 'username"."some_la38a')
self.assertEqual(truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38')
+ def test_format_number(self):
+ def equal(value, max_d, places, result):
+ self.assertEqual(format_number(Decimal(value), max_d, places), result)
-@unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests')
-class TestDatabaseErrorWrapper(TestCase):
- def test_reraising_backend_specific_database_exception(self):
- cursor = connection.cursor()
- msg = 'table "X" does not exist'
- with self.assertRaisesMessage(ProgrammingError, msg) as cm:
- cursor.execute('DROP TABLE "X"')
- self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__))
- self.assertIsNotNone(cm.exception.__cause__)
- self.assertIsNotNone(cm.exception.__cause__.pgcode)
- self.assertIsNotNone(cm.exception.__cause__.pgerror)
+ equal('0', 12, 3, '0.000')
+ equal('0', 12, 8, '0.00000000')
+ equal('1', 12, 9, '1.000000000')
+ equal('0.00000000', 12, 8, '0.00000000')
+ equal('0.000000004', 12, 8, '0.00000000')
+ equal('0.000000008', 12, 8, '0.00000001')
+ equal('0.000000000000000000999', 10, 8, '0.00000000')
+ equal('0.1234567890', 12, 10, '0.1234567890')
+ equal('0.1234567890', 12, 9, '0.123456789')
+ equal('0.1234567890', 12, 8, '0.12345679')
+ equal('0.1234567890', 12, 5, '0.12346')
+ equal('0.1234567890', 12, 3, '0.123')
+ equal('0.1234567890', 12, 1, '0.1')
+ equal('0.1234567890', 12, 0, '0')
+ equal('0.1234567890', None, 0, '0')
+ equal('1234567890.1234567890', None, 0, '1234567890')
+ equal('1234567890.1234567890', None, 2, '1234567890.12')
+ equal('0.1234', 5, None, '0.1234')
+ equal('123.12', 5, None, '123.12')
+
+ with self.assertRaises(Rounded):
+ equal('0.1234567890', 5, None, '0.12346')
+ with self.assertRaises(Rounded):
+ equal('1234567890.1234', 5, None, '1234600000')
diff --git a/tests/backends/tests.py b/tests/backends/tests.py
index 8847b178ef..6d38625a98 100644
--- a/tests/backends/tests.py
+++ b/tests/backends/tests.py
@@ -1,13 +1,9 @@
-# Unit and doctests for specific database backends.
+"""Tests related to django.db.backends that haven't been organized."""
import datetime
-import re
import threading
import unittest
import warnings
-from decimal import Decimal, Rounded
-from unittest import mock
-from django.core.exceptions import ImproperlyConfigured
from django.core.management.color import no_style
from django.db import (
DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connection, connections,
@@ -15,322 +11,20 @@ from django.db import (
)
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.signals import connection_created
-from django.db.backends.utils import CursorWrapper, format_number
-from django.db.models import Avg, StdDev, Sum, Variance
+from django.db.backends.utils import CursorWrapper
from django.db.models.sql.constants import CURSOR
-from django.db.utils import ConnectionHandler
from django.test import (
- SimpleTestCase, TestCase, TransactionTestCase, override_settings,
- skipIfDBFeature, skipUnlessDBFeature,
+ TestCase, TransactionTestCase, override_settings, skipIfDBFeature,
+ skipUnlessDBFeature,
)
from .models import (
- Article, Item, Object, ObjectReference, Person, Post, RawData, Reporter,
+ Article, Object, ObjectReference, Person, Post, RawData, Reporter,
ReporterProxy, SchoolClass, Square,
VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ,
)
-class DatabaseWrapperTests(SimpleTestCase):
-
- def test_initialization_class_attributes(self):
- """
- The "initialization" class attributes like client_class and
- creation_class should be set on the class and reflected in the
- corresponding instance attributes of the instantiated backend.
- """
- conn = connections[DEFAULT_DB_ALIAS]
- conn_class = type(conn)
- attr_names = [
- ('client_class', 'client'),
- ('creation_class', 'creation'),
- ('features_class', 'features'),
- ('introspection_class', 'introspection'),
- ('ops_class', 'ops'),
- ('validation_class', 'validation'),
- ]
- for class_attr_name, instance_attr_name in attr_names:
- class_attr_value = getattr(conn_class, class_attr_name)
- self.assertIsNotNone(class_attr_value)
- instance_attr_value = getattr(conn, instance_attr_name)
- self.assertIsInstance(instance_attr_value, class_attr_value)
-
- def test_initialization_display_name(self):
- self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown')
- self.assertNotEqual(connection.display_name, 'unknown')
-
-
-class DummyBackendTest(SimpleTestCase):
-
- def test_no_databases(self):
- """
- Empty DATABASES setting default to the dummy backend.
- """
- DATABASES = {}
- conns = ConnectionHandler(DATABASES)
- self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy')
- with self.assertRaises(ImproperlyConfigured):
- conns[DEFAULT_DB_ALIAS].ensure_connection()
-
-
-@unittest.skipUnless(connection.vendor == 'oracle', "Test only for Oracle")
-class OracleTests(unittest.TestCase):
-
- def test_quote_name(self):
- # '%' chars are escaped for query execution.
- name = '"SOME%NAME"'
- quoted_name = connection.ops.quote_name(name)
- self.assertEqual(quoted_name % (), name)
-
- def test_dbms_session(self):
- # If the backend is Oracle, test that we can call a standard
- # stored procedure through our cursor wrapper.
- with connection.cursor() as cursor:
- cursor.callproc('DBMS_SESSION.SET_IDENTIFIER', ['_django_testing!'])
-
- def test_cursor_var(self):
- # If the backend is Oracle, test that we can pass cursor variables
- # as query parameters.
- from django.db.backends.oracle.base import Database
-
- with connection.cursor() as cursor:
- var = cursor.var(Database.STRING)
- cursor.execute("BEGIN %s := 'X'; END; ", [var])
- self.assertEqual(var.getvalue(), 'X')
-
- def test_long_string(self):
- # If the backend is Oracle, test that we can save a text longer
- # than 4000 chars and read it properly
- with connection.cursor() as cursor:
- cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
- long_str = ''.join(str(x) for x in range(4000))
- cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str])
- cursor.execute('SELECT text FROM ltext')
- row = cursor.fetchone()
- self.assertEqual(long_str, row[0].read())
- cursor.execute('DROP TABLE ltext')
-
- def test_client_encoding(self):
- # If the backend is Oracle, test that the client encoding is set
- # correctly. This was broken under Cygwin prior to r14781.
- connection.ensure_connection()
- self.assertEqual(connection.connection.encoding, "UTF-8")
- self.assertEqual(connection.connection.nencoding, "UTF-8")
-
- def test_order_of_nls_parameters(self):
- # an 'almost right' datetime should work with configured
- # NLS parameters as per #18465.
- with connection.cursor() as cursor:
- query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
- # The query succeeds without errors - pre #18465 this
- # wasn't the case.
- cursor.execute(query)
- self.assertEqual(cursor.fetchone()[0], 1)
-
-
-@unittest.skipUnless(connection.vendor == 'sqlite', "Test only for SQLite")
-class SQLiteTests(TestCase):
-
- longMessage = True
-
- def test_autoincrement(self):
- """
- auto_increment fields are created with the AUTOINCREMENT keyword
- in order to be monotonically increasing. Refs #10164.
- """
- with connection.schema_editor(collect_sql=True) as editor:
- editor.create_model(Square)
- statements = editor.collected_sql
- match = re.search('"id" ([^,]+),', statements[0])
- self.assertIsNotNone(match)
- self.assertEqual(
- 'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
- match.group(1),
- "Wrong SQL used to create an auto-increment column on SQLite"
- )
-
- def test_aggregation(self):
- """
- #19360: Raise NotImplementedError when aggregating on date/time fields.
- """
- for aggregate in (Sum, Avg, Variance, StdDev):
- with self.assertRaises(NotImplementedError):
- Item.objects.all().aggregate(aggregate('time'))
- with self.assertRaises(NotImplementedError):
- Item.objects.all().aggregate(aggregate('date'))
- with self.assertRaises(NotImplementedError):
- Item.objects.all().aggregate(aggregate('last_modified'))
- with self.assertRaises(NotImplementedError):
- Item.objects.all().aggregate(
- **{'complex': aggregate('last_modified') + aggregate('last_modified')}
- )
-
- def test_memory_db_test_name(self):
- """
- A named in-memory db should be allowed where supported.
- """
- from django.db.backends.sqlite3.base import DatabaseWrapper
- settings_dict = {
- 'TEST': {
- 'NAME': 'file:memorydb_test?mode=memory&cache=shared',
- }
- }
- wrapper = DatabaseWrapper(settings_dict)
- creation = wrapper.creation
- if creation.connection.features.can_share_in_memory_db:
- expected = creation.connection.settings_dict['TEST']['NAME']
- self.assertEqual(creation._get_test_db_name(), expected)
- else:
- msg = (
- "Using a shared memory database with `mode=memory` in the "
- "database name is not supported in your environment, "
- "use `:memory:` instead."
- )
- with self.assertRaisesMessage(ImproperlyConfigured, msg):
- creation._get_test_db_name()
-
-
-@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL")
-class PostgreSQLTests(TestCase):
-
- def test_nodb_connection(self):
- """
- The _nodb_connection property fallbacks to the default connection
- database when access to the 'postgres' database is not granted.
- """
- def mocked_connect(self):
- if self.settings_dict['NAME'] is None:
- raise DatabaseError()
- return ''
-
- nodb_conn = connection._nodb_connection
- self.assertIsNone(nodb_conn.settings_dict['NAME'])
-
- # Now assume the 'postgres' db isn't available
- with warnings.catch_warnings(record=True) as w:
- with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
- side_effect=mocked_connect, autospec=True):
- warnings.simplefilter('always', RuntimeWarning)
- nodb_conn = connection._nodb_connection
- self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
- self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
- # Check a RuntimeWarning has been emitted
- self.assertEqual(len(w), 1)
- self.assertEqual(w[0].message.__class__, RuntimeWarning)
-
- def test_connect_and_rollback(self):
- """
- PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
- transaction is rolled back (#17062).
- """
- new_connection = connection.copy()
-
- try:
- # Ensure the database default time zone is different than
- # the time zone in new_connection.settings_dict. We can
- # get the default time zone by reset & show.
- cursor = new_connection.cursor()
- cursor.execute("RESET TIMEZONE")
- cursor.execute("SHOW TIMEZONE")
- db_default_tz = cursor.fetchone()[0]
- new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
- new_connection.close()
-
- # Invalidate timezone name cache, because the setting_changed
- # handler cannot know about new_connection.
- del new_connection.timezone_name
-
- # Fetch a new connection with the new_tz as default
- # time zone, run a query and rollback.
- with self.settings(TIME_ZONE=new_tz):
- new_connection.set_autocommit(False)
- cursor = new_connection.cursor()
- new_connection.rollback()
-
- # Now let's see if the rollback rolled back the SET TIME ZONE.
- cursor.execute("SHOW TIMEZONE")
- tz = cursor.fetchone()[0]
- self.assertEqual(new_tz, tz)
-
- finally:
- new_connection.close()
-
- def test_connect_non_autocommit(self):
- """
- The connection wrapper shouldn't believe that autocommit is enabled
- after setting the time zone when AUTOCOMMIT is False (#21452).
- """
- new_connection = connection.copy()
- new_connection.settings_dict['AUTOCOMMIT'] = False
-
- try:
- # Open a database connection.
- new_connection.cursor()
- self.assertFalse(new_connection.get_autocommit())
- finally:
- new_connection.close()
-
- def test_connect_isolation_level(self):
- """
- Regression test for #18130 and #24318.
- """
- import psycopg2
- from psycopg2.extensions import (
- ISOLATION_LEVEL_READ_COMMITTED as read_committed,
- ISOLATION_LEVEL_SERIALIZABLE as serializable,
- )
-
- # Since this is a django.test.TestCase, a transaction is in progress
- # and the isolation level isn't reported as 0. This test assumes that
- # PostgreSQL is configured with the default isolation level.
-
- # Check the level on the psycopg2 connection, not the Django wrapper.
- default_level = read_committed if psycopg2.__version__ < '2.7' else None
- self.assertEqual(connection.connection.isolation_level, default_level)
-
- new_connection = connection.copy()
- new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
- try:
- # Start a transaction so the isolation level isn't reported as 0.
- new_connection.set_autocommit(False)
- # Check the level on the psycopg2 connection, not the Django wrapper.
- self.assertEqual(new_connection.connection.isolation_level, serializable)
- finally:
- new_connection.close()
-
- def _select(self, val):
- with connection.cursor() as cursor:
- cursor.execute("SELECT %s", (val,))
- return cursor.fetchone()[0]
-
- def test_select_ascii_array(self):
- a = ["awef"]
- b = self._select(a)
- self.assertEqual(a[0], b[0])
-
- def test_select_unicode_array(self):
- a = ["ᄲawef"]
- b = self._select(a)
- self.assertEqual(a[0], b[0])
-
- def test_lookup_cast(self):
- from django.db.backends.postgresql.operations import DatabaseOperations
-
- do = DatabaseOperations(connection=None)
- for lookup in ('iexact', 'contains', 'icontains', 'startswith',
- 'istartswith', 'endswith', 'iendswith', 'regex', 'iregex'):
- self.assertIn('::text', do.lookup_cast(lookup))
-
- def test_correct_extraction_psycopg2_version(self):
- from django.db.backends.postgresql.base import psycopg2_version
-
- with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
- self.assertEqual(psycopg2_version(), (4, 2, 1))
-
- with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
- self.assertEqual(psycopg2_version(), (4, 2))
-
-
class DateQuotingTest(TestCase):
def test_django_date_trunc(self):
@@ -379,38 +73,6 @@ class LastExecutedQueryTest(TestCase):
last_sql = cursor.db.ops.last_executed_query(cursor, sql, params)
self.assertIsInstance(last_sql, str)
- @unittest.skipUnless(connection.vendor == 'sqlite',
- "This test is specific to SQLite.")
- def test_no_interpolation_on_sqlite(self):
- # This shouldn't raise an exception (##17158)
- query = "SELECT strftime('%Y', 'now');"
- connection.cursor().execute(query)
- self.assertEqual(connection.queries[-1]['sql'], query)
-
- @unittest.skipUnless(connection.vendor == 'sqlite',
- "This test is specific to SQLite.")
- def test_parameter_quoting_on_sqlite(self):
- # The implementation of last_executed_queries isn't optimal. It's
- # worth testing that parameters are quoted. See #14091.
- query = "SELECT %s"
- params = ["\"'\\"]
- connection.cursor().execute(query, params)
- # Note that the single quote is repeated
- substituted = "SELECT '\"''\\'"
- self.assertEqual(connection.queries[-1]['sql'], substituted)
-
- @unittest.skipUnless(connection.vendor == 'sqlite',
- "This test is specific to SQLite.")
- def test_large_number_of_parameters_on_sqlite(self):
- # If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
- # greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
- # can hit the SQLITE_MAX_COLUMN limit. See #26063.
- cursor = connection.cursor()
- sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
- params = list(range(2001))
- # This should not raise an exception.
- cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
-
class ParameterHandlingTest(TestCase):
@@ -539,16 +201,6 @@ class EscapingChecks(TestCase):
cursor.execute("SELECT '%%', %s" + self.bare_select_suffix, ('%d',))
self.assertEqual(cursor.fetchall()[0], ('%', '%d'))
- @unittest.skipUnless(connection.vendor == 'sqlite',
- "This is an sqlite-specific issue")
- def test_sqlite_parameter_escaping(self):
- # '%s' escaping support for sqlite3 #13648
- cursor = connection.cursor()
- cursor.execute("select strftime('%s', date('now'))")
- response = cursor.fetchall()[0][0]
- # response should be an non-zero integer
- self.assertTrue(int(response))
-
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
@@ -1128,76 +780,3 @@ class DBConstraintTestCase(TestCase):
intermediary_model.objects.create(from_object_id=obj.id, to_object_id=12345)
self.assertEqual(obj.related_objects.count(), 1)
self.assertEqual(intermediary_model.objects.count(), 2)
-
-
-class BackendUtilTests(SimpleTestCase):
-
- def test_format_number(self):
- """
- Test the format_number converter utility
- """
- def equal(value, max_d, places, result):
- self.assertEqual(format_number(Decimal(value), max_d, places), result)
-
- equal('0', 12, 3,
- '0.000')
- equal('0', 12, 8,
- '0.00000000')
- equal('1', 12, 9,
- '1.000000000')
- equal('0.00000000', 12, 8,
- '0.00000000')
- equal('0.000000004', 12, 8,
- '0.00000000')
- equal('0.000000008', 12, 8,
- '0.00000001')
- equal('0.000000000000000000999', 10, 8,
- '0.00000000')
- equal('0.1234567890', 12, 10,
- '0.1234567890')
- equal('0.1234567890', 12, 9,
- '0.123456789')
- equal('0.1234567890', 12, 8,
- '0.12345679')
- equal('0.1234567890', 12, 5,
- '0.12346')
- equal('0.1234567890', 12, 3,
- '0.123')
- equal('0.1234567890', 12, 1,
- '0.1')
- equal('0.1234567890', 12, 0,
- '0')
- equal('0.1234567890', None, 0,
- '0')
- equal('1234567890.1234567890', None, 0,
- '1234567890')
- equal('1234567890.1234567890', None, 2,
- '1234567890.12')
- equal('0.1234', 5, None,
- '0.1234')
- equal('123.12', 5, None,
- '123.12')
- with self.assertRaises(Rounded):
- equal('0.1234567890', 5, None,
- '0.12346')
- with self.assertRaises(Rounded):
- equal('1234567890.1234', 5, None,
- '1234600000')
-
-
-@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite specific test.')
-@skipUnlessDBFeature('can_share_in_memory_db')
-class TestSqliteThreadSharing(TransactionTestCase):
- available_apps = ['backends']
-
- def test_database_sharing_in_threads(self):
- def create_object():
- Object.objects.create()
-
- create_object()
-
- thread = threading.Thread(target=create_object)
- thread.start()
- thread.join()
-
- self.assertEqual(Object.objects.count(), 2)
diff --git a/tests/db_utils/tests.py b/tests/db_utils/tests.py
new file mode 100644
index 0000000000..2a45342df5
--- /dev/null
+++ b/tests/db_utils/tests.py
@@ -0,0 +1,49 @@
+"""Tests for django.db.utils."""
+import unittest
+
+from django.core.exceptions import ImproperlyConfigured
+from django.db import DEFAULT_DB_ALIAS, connection
+from django.db.utils import ConnectionHandler, ProgrammingError, load_backend
+from django.test import SimpleTestCase, TestCase
+
+
+class ConnectionHandlerTests(SimpleTestCase):
+
+ def test_connection_handler_no_databases(self):
+ """Empty DATABASES setting defaults to the dummy backend."""
+ DATABASES = {}
+ conns = ConnectionHandler(DATABASES)
+ self.assertEqual(conns[DEFAULT_DB_ALIAS].settings_dict['ENGINE'], 'django.db.backends.dummy')
+ msg = (
+ 'settings.DATABASES is improperly configured. Please supply the '
+ 'ENGINE value. Check settings documentation for more details.'
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
+ conns[DEFAULT_DB_ALIAS].ensure_connection()
+
+
+class DatabaseErrorWrapperTests(TestCase):
+
+ @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL test')
+ def test_reraising_backend_specific_database_exception(self):
+ cursor = connection.cursor()
+ msg = 'table "X" does not exist'
+ with self.assertRaisesMessage(ProgrammingError, msg) as cm:
+ cursor.execute('DROP TABLE "X"')
+ self.assertNotEqual(type(cm.exception), type(cm.exception.__cause__))
+ self.assertIsNotNone(cm.exception.__cause__)
+ self.assertIsNotNone(cm.exception.__cause__.pgcode)
+ self.assertIsNotNone(cm.exception.__cause__.pgerror)
+
+
+class LoadBackendTests(SimpleTestCase):
+
+ def test_load_backend_invalid_name(self):
+ msg = (
+ "'foo' isn't an available database backend.\n"
+ "Try using 'django.db.backends.XXX', where XXX is one of:\n"
+ " 'mysql', 'oracle', 'postgresql', 'sqlite3'"
+ )
+ with self.assertRaisesMessage(ImproperlyConfigured, msg) as cm:
+ load_backend('foo')
+ self.assertEqual(str(cm.exception.__cause__), "No module named 'foo'")