"""Tests for django.db.backends.utils""" from decimal import Decimal, Rounded from django.db import NotSupportedError, connection from django.db.backends.utils import ( format_number, split_identifier, split_tzname_delta, truncate_name, ) from django.test import ( SimpleTestCase, TransactionTestCase, skipIfDBFeature, skipUnlessDBFeature, ) class TestUtils(SimpleTestCase): def test_truncate_name(self): self.assertEqual(truncate_name("some_table", 10), "some_table") self.assertEqual(truncate_name("some_long_table", 10), "some_la38a") self.assertEqual(truncate_name("some_long_table", 10, 3), "some_loa38") self.assertEqual(truncate_name("some_long_table"), "some_long_table") # "user"."table" syntax self.assertEqual( truncate_name('username"."some_table', 10), 'username"."some_table' ) self.assertEqual( truncate_name('username"."some_long_table', 10), 'username"."some_la38a' ) self.assertEqual( truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38' ) def test_split_identifier(self): self.assertEqual(split_identifier("some_table"), ("", "some_table")) self.assertEqual(split_identifier('"some_table"'), ("", "some_table")) self.assertEqual( split_identifier('namespace"."some_table'), ("namespace", "some_table") ) self.assertEqual( split_identifier('"namespace"."some_table"'), ("namespace", "some_table") ) def test_format_number(self): def equal(value, max_d, places, result): self.assertEqual(format_number(Decimal(value), max_d, places), result) equal("0", 12, 3, "0.000") equal("0", 12, 8, "0.00000000") equal("1", 12, 9, "1.000000000") equal("0.00000000", 12, 8, "0.00000000") equal("0.000000004", 12, 8, "0.00000000") equal("0.000000008", 12, 8, "0.00000001") equal("0.000000000000000000999", 10, 8, "0.00000000") equal("0.1234567890", 12, 10, "0.1234567890") equal("0.1234567890", 12, 9, "0.123456789") equal("0.1234567890", 12, 8, "0.12345679") equal("0.1234567890", 12, 5, "0.12346") equal("0.1234567890", 12, 3, "0.123") equal("0.1234567890", 12, 1, "0.1") equal("0.1234567890", 12, 0, "0") equal("0.1234567890", None, 0, "0") equal("1234567890.1234567890", None, 0, "1234567890") equal("1234567890.1234567890", None, 2, "1234567890.12") equal("0.1234", 5, None, "0.1234") equal("123.12", 5, None, "123.12") with self.assertRaises(Rounded): equal("0.1234567890", 5, None, "0.12346") with self.assertRaises(Rounded): equal("1234567890.1234", 5, None, "1234600000") def test_split_tzname_delta(self): tests = [ ("Asia/Ust+Nera", ("Asia/Ust+Nera", None, None)), ("Asia/Ust-Nera", ("Asia/Ust-Nera", None, None)), ("Asia/Ust+Nera-02:00", ("Asia/Ust+Nera", "-", "02:00")), ("Asia/Ust-Nera+05:00", ("Asia/Ust-Nera", "+", "05:00")), ("America/Coral_Harbour-01:00", ("America/Coral_Harbour", "-", "01:00")), ("America/Coral_Harbour+02:30", ("America/Coral_Harbour", "+", "02:30")), ("UTC+15:00", ("UTC", "+", "15:00")), ("UTC-04:43", ("UTC", "-", "04:43")), ("UTC", ("UTC", None, None)), ("UTC+1", ("UTC+1", None, None)), ] for tzname, expected in tests: with self.subTest(tzname=tzname): self.assertEqual(split_tzname_delta(tzname), expected) class CursorWrapperTests(TransactionTestCase): available_apps = [] def _test_procedure(self, procedure_sql, params, param_types, kparams=None): with connection.cursor() as cursor: cursor.execute(procedure_sql) # Use a new cursor because in MySQL a procedure can't be used in the # same cursor in which it was created. with connection.cursor() as cursor: cursor.callproc("test_procedure", params, kparams) with connection.schema_editor() as editor: editor.remove_procedure("test_procedure", param_types) @skipUnlessDBFeature("create_test_procedure_without_params_sql") def test_callproc_without_params(self): self._test_procedure( connection.features.create_test_procedure_without_params_sql, [], [] ) @skipUnlessDBFeature("create_test_procedure_with_int_param_sql") def test_callproc_with_int_params(self): self._test_procedure( connection.features.create_test_procedure_with_int_param_sql, [1], ["INTEGER"], ) @skipUnlessDBFeature( "create_test_procedure_with_int_param_sql", "supports_callproc_kwargs" ) def test_callproc_kparams(self): self._test_procedure( connection.features.create_test_procedure_with_int_param_sql, [], ["INTEGER"], {"P_I": 1}, ) @skipIfDBFeature("supports_callproc_kwargs") def test_unsupported_callproc_kparams_raises_error(self): msg = ( "Keyword parameters for callproc are not supported on this database " "backend." ) with self.assertRaisesMessage(NotSupportedError, msg): with connection.cursor() as cursor: cursor.callproc("test_procedure", [], {"P_I": 1})