summaryrefslogtreecommitdiff
path: root/tests/inspectdb
diff options
context:
space:
mode:
authordjango-bot <ops@djangoproject.com>2022-02-03 20:24:19 +0100
committerMariusz Felisiak <felisiak.mariusz@gmail.com>2022-02-07 20:37:05 +0100
commit9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch)
treef0506b668a013d0063e5fba3dbf4863b466713ba /tests/inspectdb
parentf68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff)
downloaddjango-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/inspectdb')
-rw-r--r--tests/inspectdb/models.py48
-rw-r--r--tests/inspectdb/tests.py401
2 files changed, 274 insertions, 175 deletions
diff --git a/tests/inspectdb/models.py b/tests/inspectdb/models.py
index ac201eeaa2..f2c23f0e0c 100644
--- a/tests/inspectdb/models.py
+++ b/tests/inspectdb/models.py
@@ -3,11 +3,11 @@ from django.db import connection, models
class People(models.Model):
name = models.CharField(max_length=255)
- parent = models.ForeignKey('self', models.CASCADE)
+ parent = models.ForeignKey("self", models.CASCADE)
class Message(models.Model):
- from_field = models.ForeignKey(People, models.CASCADE, db_column='from_id')
+ from_field = models.ForeignKey(People, models.CASCADE, db_column="from_id")
class PeopleData(models.Model):
@@ -23,25 +23,27 @@ class PeopleMoreData(models.Model):
class ForeignKeyToField(models.Model):
to_field_fk = models.ForeignKey(
- PeopleMoreData, models.CASCADE, to_field='people_unique',
+ PeopleMoreData,
+ models.CASCADE,
+ to_field="people_unique",
)
class DigitsInColumnName(models.Model):
- all_digits = models.CharField(max_length=11, db_column='123')
- leading_digit = models.CharField(max_length=11, db_column='4extra')
- leading_digits = models.CharField(max_length=11, db_column='45extra')
+ all_digits = models.CharField(max_length=11, db_column="123")
+ leading_digit = models.CharField(max_length=11, db_column="4extra")
+ leading_digits = models.CharField(max_length=11, db_column="45extra")
class SpecialName(models.Model):
- field = models.IntegerField(db_column='field')
+ field = models.IntegerField(db_column="field")
# Underscores
- field_field_0 = models.IntegerField(db_column='Field_')
- field_field_1 = models.IntegerField(db_column='Field__')
- field_field_2 = models.IntegerField(db_column='__field')
+ field_field_0 = models.IntegerField(db_column="Field_")
+ field_field_1 = models.IntegerField(db_column="Field__")
+ field_field_2 = models.IntegerField(db_column="__field")
# Other chars
- prc_x = models.IntegerField(db_column='prc(%) x')
- non_ascii = models.IntegerField(db_column='tamaño')
+ prc_x = models.IntegerField(db_column="prc(%) x")
+ non_ascii = models.IntegerField(db_column="tamaño")
class Meta:
db_table = "inspectdb_special.table name"
@@ -80,38 +82,38 @@ class JSONFieldColumnType(models.Model):
class Meta:
required_db_features = {
- 'can_introspect_json_field',
- 'supports_json_field',
+ "can_introspect_json_field",
+ "supports_json_field",
}
-test_collation = connection.features.test_collations.get('non_default')
+test_collation = connection.features.test_collations.get("non_default")
class CharFieldDbCollation(models.Model):
char_field = models.CharField(max_length=10, db_collation=test_collation)
class Meta:
- required_db_features = {'supports_collation_on_charfield'}
+ required_db_features = {"supports_collation_on_charfield"}
class TextFieldDbCollation(models.Model):
text_field = models.TextField(db_collation=test_collation)
class Meta:
- required_db_features = {'supports_collation_on_textfield'}
+ required_db_features = {"supports_collation_on_textfield"}
class UniqueTogether(models.Model):
field1 = models.IntegerField()
field2 = models.CharField(max_length=10)
- from_field = models.IntegerField(db_column='from')
- non_unique = models.IntegerField(db_column='non__unique_column')
- non_unique_0 = models.IntegerField(db_column='non_unique__column')
+ from_field = models.IntegerField(db_column="from")
+ non_unique = models.IntegerField(db_column="non__unique_column")
+ non_unique_0 = models.IntegerField(db_column="non_unique__column")
class Meta:
unique_together = [
- ('field1', 'field2'),
- ('from_field', 'field1'),
- ('non_unique', 'non_unique_0'),
+ ("field1", "field2"),
+ ("from_field", "field1"),
+ ("non_unique", "non_unique_0"),
]
diff --git a/tests/inspectdb/tests.py b/tests/inspectdb/tests.py
index 20da7ade4d..83dbfd406d 100644
--- a/tests/inspectdb/tests.py
+++ b/tests/inspectdb/tests.py
@@ -16,31 +16,34 @@ def inspectdb_tables_only(table_name):
Limit introspection to tables created for models of this app.
Some databases such as Oracle are extremely slow at introspection.
"""
- return table_name.startswith('inspectdb_')
+ return table_name.startswith("inspectdb_")
def inspectdb_views_only(table_name):
- return (
- table_name.startswith('inspectdb_') and
- table_name.endswith(('_materialized', '_view'))
+ return table_name.startswith("inspectdb_") and table_name.endswith(
+ ("_materialized", "_view")
)
def special_table_only(table_name):
- return table_name.startswith('inspectdb_special')
+ return table_name.startswith("inspectdb_special")
class InspectDBTestCase(TestCase):
- unique_re = re.compile(r'.*unique_together = \((.+),\).*')
+ unique_re = re.compile(r".*unique_together = \((.+),\).*")
def test_stealth_table_name_filter_option(self):
out = StringIO()
- call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
- error_message = "inspectdb has examined a table that should have been filtered out."
+ call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
+ error_message = (
+ "inspectdb has examined a table that should have been filtered out."
+ )
# contrib.contenttypes is one of the apps always installed when running
# the Django test suite, check that one of its tables hasn't been
# inspected
- self.assertNotIn("class DjangoContentType(models.Model):", out.getvalue(), msg=error_message)
+ self.assertNotIn(
+ "class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
+ )
def test_table_option(self):
"""
@@ -48,19 +51,19 @@ class InspectDBTestCase(TestCase):
arguments.
"""
out = StringIO()
- call_command('inspectdb', 'inspectdb_people', stdout=out)
+ call_command("inspectdb", "inspectdb_people", stdout=out)
output = out.getvalue()
- self.assertIn('class InspectdbPeople(models.Model):', output)
+ self.assertIn("class InspectdbPeople(models.Model):", output)
self.assertNotIn("InspectdbPeopledata", output)
def make_field_type_asserter(self):
"""Call inspectdb and return a function to validate a field type in its output"""
out = StringIO()
- call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
+ call_command("inspectdb", "inspectdb_columntypes", stdout=out)
output = out.getvalue()
def assertFieldType(name, definition):
- out_def = re.search(r'^\s*%s = (models.*)$' % name, output, re.MULTILINE)[1]
+ out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
self.assertEqual(definition, out_def)
return assertFieldType
@@ -69,53 +72,65 @@ class InspectDBTestCase(TestCase):
"""Test introspection of various Django field types"""
assertFieldType = self.make_field_type_asserter()
introspected_field_types = connection.features.introspected_field_types
- char_field_type = introspected_field_types['CharField']
+ char_field_type = introspected_field_types["CharField"]
# Inspecting Oracle DB doesn't produce correct results (#19884):
# - it reports fields as blank=True when they aren't.
- if not connection.features.interprets_empty_strings_as_nulls and char_field_type == 'CharField':
- assertFieldType('char_field', "models.CharField(max_length=10)")
- assertFieldType('null_char_field', "models.CharField(max_length=10, blank=True, null=True)")
- assertFieldType('email_field', "models.CharField(max_length=254)")
- assertFieldType('file_field', "models.CharField(max_length=100)")
- assertFieldType('file_path_field', "models.CharField(max_length=100)")
- assertFieldType('slug_field', "models.CharField(max_length=50)")
- assertFieldType('text_field', "models.TextField()")
- assertFieldType('url_field', "models.CharField(max_length=200)")
- if char_field_type == 'TextField':
- assertFieldType('char_field', 'models.TextField()')
- assertFieldType('null_char_field', 'models.TextField(blank=True, null=True)')
- assertFieldType('email_field', 'models.TextField()')
- assertFieldType('file_field', 'models.TextField()')
- assertFieldType('file_path_field', 'models.TextField()')
- assertFieldType('slug_field', 'models.TextField()')
- assertFieldType('text_field', 'models.TextField()')
- assertFieldType('url_field', 'models.TextField()')
- assertFieldType('date_field', "models.DateField()")
- assertFieldType('date_time_field', "models.DateTimeField()")
- if introspected_field_types['GenericIPAddressField'] == 'GenericIPAddressField':
- assertFieldType('gen_ip_address_field', "models.GenericIPAddressField()")
+ if (
+ not connection.features.interprets_empty_strings_as_nulls
+ and char_field_type == "CharField"
+ ):
+ assertFieldType("char_field", "models.CharField(max_length=10)")
+ assertFieldType(
+ "null_char_field",
+ "models.CharField(max_length=10, blank=True, null=True)",
+ )
+ assertFieldType("email_field", "models.CharField(max_length=254)")
+ assertFieldType("file_field", "models.CharField(max_length=100)")
+ assertFieldType("file_path_field", "models.CharField(max_length=100)")
+ assertFieldType("slug_field", "models.CharField(max_length=50)")
+ assertFieldType("text_field", "models.TextField()")
+ assertFieldType("url_field", "models.CharField(max_length=200)")
+ if char_field_type == "TextField":
+ assertFieldType("char_field", "models.TextField()")
+ assertFieldType(
+ "null_char_field", "models.TextField(blank=True, null=True)"
+ )
+ assertFieldType("email_field", "models.TextField()")
+ assertFieldType("file_field", "models.TextField()")
+ assertFieldType("file_path_field", "models.TextField()")
+ assertFieldType("slug_field", "models.TextField()")
+ assertFieldType("text_field", "models.TextField()")
+ assertFieldType("url_field", "models.TextField()")
+ assertFieldType("date_field", "models.DateField()")
+ assertFieldType("date_time_field", "models.DateTimeField()")
+ if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
+ assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
elif not connection.features.interprets_empty_strings_as_nulls:
- assertFieldType('gen_ip_address_field', "models.CharField(max_length=39)")
- assertFieldType('time_field', 'models.%s()' % introspected_field_types['TimeField'])
+ assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
+ assertFieldType(
+ "time_field", "models.%s()" % introspected_field_types["TimeField"]
+ )
if connection.features.has_native_uuid_field:
- assertFieldType('uuid_field', "models.UUIDField()")
+ assertFieldType("uuid_field", "models.UUIDField()")
elif not connection.features.interprets_empty_strings_as_nulls:
- assertFieldType('uuid_field', "models.CharField(max_length=32)")
+ assertFieldType("uuid_field", "models.CharField(max_length=32)")
- @skipUnlessDBFeature('can_introspect_json_field', 'supports_json_field')
+ @skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
def test_json_field(self):
out = StringIO()
- call_command('inspectdb', 'inspectdb_jsonfieldcolumntype', stdout=out)
+ call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
output = out.getvalue()
if not connection.features.interprets_empty_strings_as_nulls:
- self.assertIn('json_field = models.JSONField()', output)
- self.assertIn('null_json_field = models.JSONField(blank=True, null=True)', output)
+ self.assertIn("json_field = models.JSONField()", output)
+ self.assertIn(
+ "null_json_field = models.JSONField(blank=True, null=True)", output
+ )
- @skipUnlessDBFeature('supports_collation_on_charfield')
- @skipUnless(test_collation, 'Language collations are not supported.')
+ @skipUnlessDBFeature("supports_collation_on_charfield")
+ @skipUnless(test_collation, "Language collations are not supported.")
def test_char_field_db_collation(self):
out = StringIO()
- call_command('inspectdb', 'inspectdb_charfielddbcollation', stdout=out)
+ call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
output = out.getvalue()
if not connection.features.interprets_empty_strings_as_nulls:
self.assertIn(
@@ -130,11 +145,11 @@ class InspectDBTestCase(TestCase):
output,
)
- @skipUnlessDBFeature('supports_collation_on_textfield')
- @skipUnless(test_collation, 'Language collations are not supported.')
+ @skipUnlessDBFeature("supports_collation_on_textfield")
+ @skipUnless(test_collation, "Language collations are not supported.")
def test_text_field_db_collation(self):
out = StringIO()
- call_command('inspectdb', 'inspectdb_textfielddbcollation', stdout=out)
+ call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
output = out.getvalue()
if not connection.features.interprets_empty_strings_as_nulls:
self.assertIn(
@@ -153,36 +168,64 @@ class InspectDBTestCase(TestCase):
assertFieldType = self.make_field_type_asserter()
introspected_field_types = connection.features.introspected_field_types
- auto_field_type = connection.features.introspected_field_types['AutoField']
- if auto_field_type != 'AutoField':
- assertFieldType('id', "models.%s(primary_key=True) # AutoField?" % auto_field_type)
+ auto_field_type = connection.features.introspected_field_types["AutoField"]
+ if auto_field_type != "AutoField":
+ assertFieldType(
+ "id", "models.%s(primary_key=True) # AutoField?" % auto_field_type
+ )
- assertFieldType('big_int_field', 'models.%s()' % introspected_field_types['BigIntegerField'])
+ assertFieldType(
+ "big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
+ )
- bool_field_type = introspected_field_types['BooleanField']
- assertFieldType('bool_field', "models.{}()".format(bool_field_type))
- assertFieldType('null_bool_field', 'models.{}(blank=True, null=True)'.format(bool_field_type))
+ bool_field_type = introspected_field_types["BooleanField"]
+ assertFieldType("bool_field", "models.{}()".format(bool_field_type))
+ assertFieldType(
+ "null_bool_field",
+ "models.{}(blank=True, null=True)".format(bool_field_type),
+ )
- if connection.vendor != 'sqlite':
- assertFieldType('decimal_field', "models.DecimalField(max_digits=6, decimal_places=1)")
- else: # Guessed arguments on SQLite, see #5014
- assertFieldType('decimal_field', "models.DecimalField(max_digits=10, decimal_places=5) "
- "# max_digits and decimal_places have been guessed, "
- "as this database handles decimal fields as float")
+ if connection.vendor != "sqlite":
+ assertFieldType(
+ "decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
+ )
+ else: # Guessed arguments on SQLite, see #5014
+ assertFieldType(
+ "decimal_field",
+ "models.DecimalField(max_digits=10, decimal_places=5) "
+ "# max_digits and decimal_places have been guessed, "
+ "as this database handles decimal fields as float",
+ )
- assertFieldType('float_field', "models.FloatField()")
- assertFieldType('int_field', 'models.%s()' % introspected_field_types['IntegerField'])
- assertFieldType('pos_int_field', 'models.%s()' % introspected_field_types['PositiveIntegerField'])
- assertFieldType('pos_big_int_field', 'models.%s()' % introspected_field_types['PositiveBigIntegerField'])
- assertFieldType('pos_small_int_field', 'models.%s()' % introspected_field_types['PositiveSmallIntegerField'])
- assertFieldType('small_int_field', 'models.%s()' % introspected_field_types['SmallIntegerField'])
+ assertFieldType("float_field", "models.FloatField()")
+ assertFieldType(
+ "int_field", "models.%s()" % introspected_field_types["IntegerField"]
+ )
+ assertFieldType(
+ "pos_int_field",
+ "models.%s()" % introspected_field_types["PositiveIntegerField"],
+ )
+ assertFieldType(
+ "pos_big_int_field",
+ "models.%s()" % introspected_field_types["PositiveBigIntegerField"],
+ )
+ assertFieldType(
+ "pos_small_int_field",
+ "models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
+ )
+ assertFieldType(
+ "small_int_field",
+ "models.%s()" % introspected_field_types["SmallIntegerField"],
+ )
- @skipUnlessDBFeature('can_introspect_foreign_keys')
+ @skipUnlessDBFeature("can_introspect_foreign_keys")
def test_attribute_name_not_python_keyword(self):
out = StringIO()
- call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
+ call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
output = out.getvalue()
- error_message = "inspectdb generated an attribute name which is a Python keyword"
+ error_message = (
+ "inspectdb generated an attribute name which is a Python keyword"
+ )
# Recursive foreign keys should be set to 'self'
self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
self.assertNotIn(
@@ -196,18 +239,18 @@ class InspectDBTestCase(TestCase):
output,
)
self.assertIn(
- 'people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, primary_key=True)',
+ "people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, primary_key=True)",
output,
)
self.assertIn(
- 'people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)',
+ "people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
output,
)
- @skipUnlessDBFeature('can_introspect_foreign_keys')
+ @skipUnlessDBFeature("can_introspect_foreign_keys")
def test_foreign_key_to_field(self):
out = StringIO()
- call_command('inspectdb', 'inspectdb_foreignkeytofield', stdout=out)
+ call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
self.assertIn(
"to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
"models.DO_NOTHING, to_field='people_unique_id')",
@@ -216,20 +259,28 @@ class InspectDBTestCase(TestCase):
def test_digits_column_name_introspection(self):
"""Introspection of column names consist/start with digits (#16536/#17676)"""
- char_field_type = connection.features.introspected_field_types['CharField']
+ char_field_type = connection.features.introspected_field_types["CharField"]
out = StringIO()
- call_command('inspectdb', 'inspectdb_digitsincolumnname', stdout=out)
+ call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
output = out.getvalue()
error_message = "inspectdb generated a model field name which is a number"
- self.assertNotIn(' 123 = models.%s' % char_field_type, output, msg=error_message)
- self.assertIn('number_123 = models.%s' % char_field_type, output)
+ self.assertNotIn(
+ " 123 = models.%s" % char_field_type, output, msg=error_message
+ )
+ self.assertIn("number_123 = models.%s" % char_field_type, output)
- error_message = "inspectdb generated a model field name which starts with a digit"
- self.assertNotIn(' 4extra = models.%s' % char_field_type, output, msg=error_message)
- self.assertIn('number_4extra = models.%s' % char_field_type, output)
+ error_message = (
+ "inspectdb generated a model field name which starts with a digit"
+ )
+ self.assertNotIn(
+ " 4extra = models.%s" % char_field_type, output, msg=error_message
+ )
+ self.assertIn("number_4extra = models.%s" % char_field_type, output)
- self.assertNotIn(' 45extra = models.%s' % char_field_type, output, msg=error_message)
- self.assertIn('number_45extra = models.%s' % char_field_type, output)
+ self.assertNotIn(
+ " 45extra = models.%s" % char_field_type, output, msg=error_message
+ )
+ self.assertIn("number_45extra = models.%s" % char_field_type, output)
def test_special_column_name_introspection(self):
"""
@@ -237,15 +288,30 @@ class InspectDBTestCase(TestCase):
unsuitable for Python identifiers
"""
out = StringIO()
- call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
+ call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
output = out.getvalue()
- base_name = connection.introspection.identifier_converter('Field')
- integer_field_type = connection.features.introspected_field_types['IntegerField']
+ base_name = connection.introspection.identifier_converter("Field")
+ integer_field_type = connection.features.introspected_field_types[
+ "IntegerField"
+ ]
self.assertIn("field = models.%s()" % integer_field_type, output)
- self.assertIn("field_field = models.%s(db_column='%s_')" % (integer_field_type, base_name), output)
- self.assertIn("field_field_0 = models.%s(db_column='%s__')" % (integer_field_type, base_name), output)
- self.assertIn("field_field_1 = models.%s(db_column='__field')" % integer_field_type, output)
- self.assertIn("prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output)
+ self.assertIn(
+ "field_field = models.%s(db_column='%s_')"
+ % (integer_field_type, base_name),
+ output,
+ )
+ self.assertIn(
+ "field_field_0 = models.%s(db_column='%s__')"
+ % (integer_field_type, base_name),
+ output,
+ )
+ self.assertIn(
+ "field_field_1 = models.%s(db_column='__field')" % integer_field_type,
+ output,
+ )
+ self.assertIn(
+ "prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
+ )
self.assertIn("tamaño = models.%s()" % integer_field_type, output)
def test_table_name_introspection(self):
@@ -254,21 +320,25 @@ class InspectDBTestCase(TestCase):
unsuitable for Python identifiers
"""
out = StringIO()
- call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
+ call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
output = out.getvalue()
self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
def test_managed_models(self):
"""By default the command generates models with `Meta.managed = False` (#14305)"""
out = StringIO()
- call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
+ call_command("inspectdb", "inspectdb_columntypes", stdout=out)
output = out.getvalue()
self.longMessage = False
- self.assertIn(" managed = False", output, msg='inspectdb should generate unmanaged models.')
+ self.assertIn(
+ " managed = False",
+ output,
+ msg="inspectdb should generate unmanaged models.",
+ )
def test_unique_together_meta(self):
out = StringIO()
- call_command('inspectdb', 'inspectdb_uniquetogether', stdout=out)
+ call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
output = out.getvalue()
self.assertIn(" unique_together = (('", output)
unique_together_match = self.unique_re.findall(output)
@@ -283,43 +353,50 @@ class InspectDBTestCase(TestCase):
# are given an integer suffix.
self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
- @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
+ @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_unsupported_unique_together(self):
"""Unsupported index types (COALESCE here) are skipped."""
with connection.cursor() as c:
c.execute(
- 'CREATE UNIQUE INDEX Findex ON %s '
- '(id, people_unique_id, COALESCE(message_id, -1))' % PeopleMoreData._meta.db_table
+ "CREATE UNIQUE INDEX Findex ON %s "
+ "(id, people_unique_id, COALESCE(message_id, -1))"
+ % PeopleMoreData._meta.db_table
)
try:
out = StringIO()
call_command(
- 'inspectdb',
- table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
+ "inspectdb",
+ table_name_filter=lambda tn: tn.startswith(
+ PeopleMoreData._meta.db_table
+ ),
stdout=out,
)
output = out.getvalue()
- self.assertIn('# A unique constraint could not be introspected.', output)
- self.assertEqual(self.unique_re.findall(output), ["('id', 'people_unique')"])
+ self.assertIn("# A unique constraint could not be introspected.", output)
+ self.assertEqual(
+ self.unique_re.findall(output), ["('id', 'people_unique')"]
+ )
finally:
with connection.cursor() as c:
- c.execute('DROP INDEX Findex')
+ c.execute("DROP INDEX Findex")
- @skipUnless(connection.vendor == 'sqlite',
- "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test")
+ @skipUnless(
+ connection.vendor == "sqlite",
+ "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
+ )
def test_custom_fields(self):
"""
Introspection of columns with a custom field (#21090)
"""
out = StringIO()
with mock.patch(
- 'django.db.connection.introspection.data_types_reverse.base_data_types_reverse',
+ "django.db.connection.introspection.data_types_reverse.base_data_types_reverse",
{
- 'text': 'myfields.TextField',
- 'bigint': 'BigIntegerField',
+ "text": "myfields.TextField",
+ "bigint": "BigIntegerField",
},
):
- call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
+ call_command("inspectdb", "inspectdb_columntypes", stdout=out)
output = out.getvalue()
self.assertIn("text_field = myfields.TextField()", output)
self.assertIn("big_int_field = models.BigIntegerField()", output)
@@ -330,9 +407,11 @@ class InspectDBTestCase(TestCase):
be visible in the output.
"""
out = StringIO()
- with mock.patch('django.db.connection.introspection.get_table_list',
- return_value=[TableInfo(name='nonexistent', type='t')]):
- call_command('inspectdb', stdout=out)
+ with mock.patch(
+ "django.db.connection.introspection.get_table_list",
+ return_value=[TableInfo(name="nonexistent", type="t")],
+ ):
+ call_command("inspectdb", stdout=out)
output = out.getvalue()
self.assertIn("# Unable to inspect table 'nonexistent'", output)
# The error message depends on the backend
@@ -340,21 +419,21 @@ class InspectDBTestCase(TestCase):
class InspectDBTransactionalTests(TransactionTestCase):
- available_apps = ['inspectdb']
+ available_apps = ["inspectdb"]
def test_include_views(self):
"""inspectdb --include-views creates models for database views."""
with connection.cursor() as cursor:
cursor.execute(
- 'CREATE VIEW inspectdb_people_view AS '
- 'SELECT id, name FROM inspectdb_people'
+ "CREATE VIEW inspectdb_people_view AS "
+ "SELECT id, name FROM inspectdb_people"
)
out = StringIO()
- view_model = 'class InspectdbPeopleView(models.Model):'
- view_managed = 'managed = False # Created from a view.'
+ view_model = "class InspectdbPeopleView(models.Model):"
+ view_managed = "managed = False # Created from a view."
try:
call_command(
- 'inspectdb',
+ "inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
@@ -362,7 +441,7 @@ class InspectDBTransactionalTests(TransactionTestCase):
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
- 'inspectdb',
+ "inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
@@ -372,22 +451,22 @@ class InspectDBTransactionalTests(TransactionTestCase):
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
- cursor.execute('DROP VIEW inspectdb_people_view')
+ cursor.execute("DROP VIEW inspectdb_people_view")
- @skipUnlessDBFeature('can_introspect_materialized_views')
+ @skipUnlessDBFeature("can_introspect_materialized_views")
def test_include_materialized_views(self):
"""inspectdb --include-views creates models for materialized views."""
with connection.cursor() as cursor:
cursor.execute(
- 'CREATE MATERIALIZED VIEW inspectdb_people_materialized AS '
- 'SELECT id, name FROM inspectdb_people'
+ "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
+ "SELECT id, name FROM inspectdb_people"
)
out = StringIO()
- view_model = 'class InspectdbPeopleMaterialized(models.Model):'
- view_managed = 'managed = False # Created from a view.'
+ view_model = "class InspectdbPeopleMaterialized(models.Model):"
+ view_managed = "managed = False # Created from a view."
try:
call_command(
- 'inspectdb',
+ "inspectdb",
table_name_filter=inspectdb_views_only,
stdout=out,
)
@@ -395,7 +474,7 @@ class InspectDBTransactionalTests(TransactionTestCase):
self.assertNotIn(view_model, no_views_output)
self.assertNotIn(view_managed, no_views_output)
call_command(
- 'inspectdb',
+ "inspectdb",
table_name_filter=inspectdb_views_only,
include_views=True,
stdout=out,
@@ -405,47 +484,61 @@ class InspectDBTransactionalTests(TransactionTestCase):
self.assertIn(view_managed, with_views_output)
finally:
with connection.cursor() as cursor:
- cursor.execute('DROP MATERIALIZED VIEW inspectdb_people_materialized')
+ cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
- @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
+ @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_include_partitions(self):
"""inspectdb --include-partitions creates models for partitions."""
with connection.cursor() as cursor:
- cursor.execute('''\
+ cursor.execute(
+ """\
CREATE TABLE inspectdb_partition_parent (name text not null)
PARTITION BY LIST (left(upper(name), 1))
- ''')
- cursor.execute('''\
+ """
+ )
+ cursor.execute(
+ """\
CREATE TABLE inspectdb_partition_child
PARTITION OF inspectdb_partition_parent
FOR VALUES IN ('A', 'B', 'C')
- ''')
+ """
+ )
out = StringIO()
- partition_model_parent = 'class InspectdbPartitionParent(models.Model):'
- partition_model_child = 'class InspectdbPartitionChild(models.Model):'
- partition_managed = 'managed = False # Created from a partition.'
+ partition_model_parent = "class InspectdbPartitionParent(models.Model):"
+ partition_model_child = "class InspectdbPartitionChild(models.Model):"
+ partition_managed = "managed = False # Created from a partition."
try:
- call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
+ call_command(
+ "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
+ )
no_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, no_partitions_output)
self.assertNotIn(partition_model_child, no_partitions_output)
self.assertNotIn(partition_managed, no_partitions_output)
- call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_partitions=True, stdout=out)
+ call_command(
+ "inspectdb",
+ table_name_filter=inspectdb_tables_only,
+ include_partitions=True,
+ stdout=out,
+ )
with_partitions_output = out.getvalue()
self.assertIn(partition_model_parent, with_partitions_output)
self.assertIn(partition_model_child, with_partitions_output)
self.assertIn(partition_managed, with_partitions_output)
finally:
with connection.cursor() as cursor:
- cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_child')
- cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_parent')
+ cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
+ cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
- @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
+ @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
def test_foreign_data_wrapper(self):
with connection.cursor() as cursor:
- cursor.execute('CREATE EXTENSION IF NOT EXISTS file_fdw')
- cursor.execute('CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw')
- cursor.execute('''\
+ cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
+ cursor.execute(
+ "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
+ )
+ cursor.execute(
+ """\
CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
petal_length real,
petal_width real,
@@ -454,13 +547,15 @@ class InspectDBTransactionalTests(TransactionTestCase):
) SERVER inspectdb_server OPTIONS (
filename %s
)
- ''', [os.devnull])
+ """,
+ [os.devnull],
+ )
out = StringIO()
- foreign_table_model = 'class InspectdbIrisForeignTable(models.Model):'
- foreign_table_managed = 'managed = False'
+ foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
+ foreign_table_managed = "managed = False"
try:
call_command(
- 'inspectdb',
+ "inspectdb",
table_name_filter=inspectdb_tables_only,
stdout=out,
)
@@ -469,6 +564,8 @@ class InspectDBTransactionalTests(TransactionTestCase):
self.assertIn(foreign_table_managed, output)
finally:
with connection.cursor() as cursor:
- cursor.execute('DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table')
- cursor.execute('DROP SERVER IF EXISTS inspectdb_server')
- cursor.execute('DROP EXTENSION IF EXISTS file_fdw')
+ cursor.execute(
+ "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
+ )
+ cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
+ cursor.execute("DROP EXTENSION IF EXISTS file_fdw")