diff options
author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/bulk_create | |
parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
download | django-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/bulk_create')
-rw-r--r-- | tests/bulk_create/models.py | 46 | ||||
-rw-r--r-- | tests/bulk_create/tests.py | 585 |
2 files changed, 377 insertions, 254 deletions
diff --git a/tests/bulk_create/models.py b/tests/bulk_create/models.py index f0db69932e..27abc416bd 100644 --- a/tests/bulk_create/models.py +++ b/tests/bulk_create/models.py @@ -19,8 +19,8 @@ class Country(models.Model): class Meta: constraints = [ models.UniqueConstraint( - fields=['iso_two_letter', 'name'], - name='country_name_iso_unique', + fields=["iso_two_letter", "name"], + name="country_name_iso_unique", ), ] @@ -90,33 +90,45 @@ class BigAutoFieldModel(models.Model): class NullableFields(models.Model): # Fields in db.backends.oracle.BulkInsertMapper big_int_filed = models.BigIntegerField(null=True, default=1) - binary_field = models.BinaryField(null=True, default=b'data') + binary_field = models.BinaryField(null=True, default=b"data") date_field = models.DateField(null=True, default=timezone.now) datetime_field = models.DateTimeField(null=True, default=timezone.now) - decimal_field = models.DecimalField(null=True, max_digits=2, decimal_places=1, default=Decimal('1.1')) + decimal_field = models.DecimalField( + null=True, max_digits=2, decimal_places=1, default=Decimal("1.1") + ) duration_field = models.DurationField(null=True, default=datetime.timedelta(1)) float_field = models.FloatField(null=True, default=3.2) integer_field = models.IntegerField(null=True, default=2) null_boolean_field = models.BooleanField(null=True, default=False) - positive_big_integer_field = models.PositiveBigIntegerField(null=True, default=2 ** 63 - 1) + positive_big_integer_field = models.PositiveBigIntegerField( + null=True, default=2**63 - 1 + ) positive_integer_field = models.PositiveIntegerField(null=True, default=3) - positive_small_integer_field = models.PositiveSmallIntegerField(null=True, default=4) + positive_small_integer_field = models.PositiveSmallIntegerField( + null=True, default=4 + ) small_integer_field = models.SmallIntegerField(null=True, default=5) time_field = models.TimeField(null=True, default=timezone.now) auto_field = models.ForeignKey(NoFields, on_delete=models.CASCADE, null=True) - small_auto_field = models.ForeignKey(SmallAutoFieldModel, on_delete=models.CASCADE, null=True) - big_auto_field = models.ForeignKey(BigAutoFieldModel, on_delete=models.CASCADE, null=True) + small_auto_field = models.ForeignKey( + SmallAutoFieldModel, on_delete=models.CASCADE, null=True + ) + big_auto_field = models.ForeignKey( + BigAutoFieldModel, on_delete=models.CASCADE, null=True + ) # Fields not required in BulkInsertMapper - char_field = models.CharField(null=True, max_length=4, default='char') - email_field = models.EmailField(null=True, default='user@example.com') - file_field = models.FileField(null=True, default='file.txt') - file_path_field = models.FilePathField(path='/tmp', null=True, default='file.txt') - generic_ip_address_field = models.GenericIPAddressField(null=True, default='127.0.0.1') + char_field = models.CharField(null=True, max_length=4, default="char") + email_field = models.EmailField(null=True, default="user@example.com") + file_field = models.FileField(null=True, default="file.txt") + file_path_field = models.FilePathField(path="/tmp", null=True, default="file.txt") + generic_ip_address_field = models.GenericIPAddressField( + null=True, default="127.0.0.1" + ) if Image: - image_field = models.ImageField(null=True, default='image.jpg') - slug_field = models.SlugField(null=True, default='slug') - text_field = models.TextField(null=True, default='text') - url_field = models.URLField(null=True, default='/') + image_field = models.ImageField(null=True, default="image.jpg") + slug_field = models.SlugField(null=True, default="slug") + text_field = models.TextField(null=True, default="text") + url_field = models.URLField(null=True, default="/") uuid_field = models.UUIDField(null=True, default=uuid.uuid4) diff --git a/tests/bulk_create/tests.py b/tests/bulk_create/tests.py index 8040f16dcf..a01e50f4cb 100644 --- a/tests/bulk_create/tests.py +++ b/tests/bulk_create/tests.py @@ -3,19 +3,36 @@ from operator import attrgetter from django.core.exceptions import FieldDoesNotExist from django.db import ( - IntegrityError, NotSupportedError, OperationalError, ProgrammingError, + IntegrityError, + NotSupportedError, + OperationalError, + ProgrammingError, connection, ) from django.db.models import FileField, Value from django.db.models.functions import Lower from django.test import ( - TestCase, override_settings, skipIfDBFeature, skipUnlessDBFeature, + TestCase, + override_settings, + skipIfDBFeature, + skipUnlessDBFeature, ) from .models import ( - BigAutoFieldModel, Country, NoFields, NullableFields, Pizzeria, - ProxyCountry, ProxyMultiCountry, ProxyMultiProxyCountry, ProxyProxyCountry, - RelatedModel, Restaurant, SmallAutoFieldModel, State, TwoFields, + BigAutoFieldModel, + Country, + NoFields, + NullableFields, + Pizzeria, + ProxyCountry, + ProxyMultiCountry, + ProxyMultiProxyCountry, + ProxyProxyCountry, + RelatedModel, + Restaurant, + SmallAutoFieldModel, + State, + TwoFields, UpsertConflict, ) @@ -26,142 +43,179 @@ class BulkCreateTests(TestCase): Country(name="United States of America", iso_two_letter="US"), Country(name="The Netherlands", iso_two_letter="NL"), Country(name="Germany", iso_two_letter="DE"), - Country(name="Czech Republic", iso_two_letter="CZ") + Country(name="Czech Republic", iso_two_letter="CZ"), ] def test_simple(self): created = Country.objects.bulk_create(self.data) self.assertEqual(created, self.data) - self.assertQuerysetEqual(Country.objects.order_by("-name"), [ - "United States of America", "The Netherlands", "Germany", "Czech Republic" - ], attrgetter("name")) + self.assertQuerysetEqual( + Country.objects.order_by("-name"), + [ + "United States of America", + "The Netherlands", + "Germany", + "Czech Republic", + ], + attrgetter("name"), + ) created = Country.objects.bulk_create([]) self.assertEqual(created, []) self.assertEqual(Country.objects.count(), 4) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_efficiency(self): with self.assertNumQueries(1): Country.objects.bulk_create(self.data) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_long_non_ascii_text(self): """ Inserting non-ASCII values with a length in the range 2001 to 4000 characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle (#22144). """ - Country.objects.bulk_create([Country(description='Ж' * 3000)]) + Country.objects.bulk_create([Country(description="Ж" * 3000)]) self.assertEqual(Country.objects.count(), 1) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_long_and_short_text(self): - Country.objects.bulk_create([ - Country(description='a' * 4001, iso_two_letter='A'), - Country(description='a', iso_two_letter='B'), - Country(description='Ж' * 2001, iso_two_letter='C'), - Country(description='Ж', iso_two_letter='D'), - ]) + Country.objects.bulk_create( + [ + Country(description="a" * 4001, iso_two_letter="A"), + Country(description="a", iso_two_letter="B"), + Country(description="Ж" * 2001, iso_two_letter="C"), + Country(description="Ж", iso_two_letter="D"), + ] + ) self.assertEqual(Country.objects.count(), 4) def test_multi_table_inheritance_unsupported(self): expected_message = "Can't bulk create a multi-table inherited model" with self.assertRaisesMessage(ValueError, expected_message): - Pizzeria.objects.bulk_create([ - Pizzeria(name="The Art of Pizza"), - ]) + Pizzeria.objects.bulk_create( + [ + Pizzeria(name="The Art of Pizza"), + ] + ) with self.assertRaisesMessage(ValueError, expected_message): - ProxyMultiCountry.objects.bulk_create([ - ProxyMultiCountry(name="Fillory", iso_two_letter="FL"), - ]) + ProxyMultiCountry.objects.bulk_create( + [ + ProxyMultiCountry(name="Fillory", iso_two_letter="FL"), + ] + ) with self.assertRaisesMessage(ValueError, expected_message): - ProxyMultiProxyCountry.objects.bulk_create([ - ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"), - ]) + ProxyMultiProxyCountry.objects.bulk_create( + [ + ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"), + ] + ) def test_proxy_inheritance_supported(self): - ProxyCountry.objects.bulk_create([ - ProxyCountry(name="Qwghlm", iso_two_letter="QW"), - Country(name="Tortall", iso_two_letter="TA"), - ]) - self.assertQuerysetEqual(ProxyCountry.objects.all(), { - "Qwghlm", "Tortall" - }, attrgetter("name"), ordered=False) - - ProxyProxyCountry.objects.bulk_create([ - ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"), - ]) - self.assertQuerysetEqual(ProxyProxyCountry.objects.all(), { - "Qwghlm", "Tortall", "Netherlands", - }, attrgetter("name"), ordered=False) + ProxyCountry.objects.bulk_create( + [ + ProxyCountry(name="Qwghlm", iso_two_letter="QW"), + Country(name="Tortall", iso_two_letter="TA"), + ] + ) + self.assertQuerysetEqual( + ProxyCountry.objects.all(), + {"Qwghlm", "Tortall"}, + attrgetter("name"), + ordered=False, + ) + + ProxyProxyCountry.objects.bulk_create( + [ + ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"), + ] + ) + self.assertQuerysetEqual( + ProxyProxyCountry.objects.all(), + { + "Qwghlm", + "Tortall", + "Netherlands", + }, + attrgetter("name"), + ordered=False, + ) def test_non_auto_increment_pk(self): - State.objects.bulk_create([ - State(two_letter_code=s) - for s in ["IL", "NY", "CA", "ME"] - ]) - self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [ - "CA", "IL", "ME", "NY", - ], attrgetter("two_letter_code")) - - @skipUnlessDBFeature('has_bulk_insert') + State.objects.bulk_create( + [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]] + ) + self.assertQuerysetEqual( + State.objects.order_by("two_letter_code"), + [ + "CA", + "IL", + "ME", + "NY", + ], + attrgetter("two_letter_code"), + ) + + @skipUnlessDBFeature("has_bulk_insert") def test_non_auto_increment_pk_efficiency(self): with self.assertNumQueries(1): - State.objects.bulk_create([ - State(two_letter_code=s) - for s in ["IL", "NY", "CA", "ME"] - ]) - self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [ - "CA", "IL", "ME", "NY", - ], attrgetter("two_letter_code")) - - @skipIfDBFeature('allows_auto_pk_0') + State.objects.bulk_create( + [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]] + ) + self.assertQuerysetEqual( + State.objects.order_by("two_letter_code"), + [ + "CA", + "IL", + "ME", + "NY", + ], + attrgetter("two_letter_code"), + ) + + @skipIfDBFeature("allows_auto_pk_0") def test_zero_as_autoval(self): """ Zero as id for AutoField should raise exception in MySQL, because MySQL does not allow zero for automatic primary key if the NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled. """ - valid_country = Country(name='Germany', iso_two_letter='DE') - invalid_country = Country(id=0, name='Poland', iso_two_letter='PL') - msg = 'The database backend does not accept 0 as a value for AutoField.' + valid_country = Country(name="Germany", iso_two_letter="DE") + invalid_country = Country(id=0, name="Poland", iso_two_letter="PL") + msg = "The database backend does not accept 0 as a value for AutoField." with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create([valid_country, invalid_country]) def test_batch_same_vals(self): # SQLite had a problem where all the same-valued models were # collapsed to one insert. - Restaurant.objects.bulk_create([ - Restaurant(name='foo') for i in range(0, 2) - ]) + Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)]) self.assertEqual(Restaurant.objects.count(), 2) def test_large_batch(self): - TwoFields.objects.bulk_create([ - TwoFields(f1=i, f2=i + 1) for i in range(0, 1001) - ]) + TwoFields.objects.bulk_create( + [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)] + ) self.assertEqual(TwoFields.objects.count(), 1001) self.assertEqual( - TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), - 101) + TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101 + ) self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_large_single_field_batch(self): # SQLite had a problem with more than 500 UNIONed selects in single # query. - Restaurant.objects.bulk_create([ - Restaurant() for i in range(0, 501) - ]) + Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)]) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_large_batch_efficiency(self): with override_settings(DEBUG=True): connection.queries_log.clear() - TwoFields.objects.bulk_create([ - TwoFields(f1=i, f2=i + 1) for i in range(0, 1001) - ]) + TwoFields.objects.bulk_create( + [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)] + ) self.assertLess(len(connection.queries), 10) def test_large_batch_mixed(self): @@ -169,10 +223,12 @@ class BulkCreateTests(TestCase): Test inserting a large batch with objects having primary key set mixed together with objects without PK set. """ - TwoFields.objects.bulk_create([ - TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1) - for i in range(100000, 101000) - ]) + TwoFields.objects.bulk_create( + [ + TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1) + for i in range(100000, 101000) + ] + ) self.assertEqual(TwoFields.objects.count(), 1000) # We can't assume much about the ID's created, except that the above # created IDs must exist. @@ -180,7 +236,7 @@ class BulkCreateTests(TestCase): self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500) self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_large_batch_mixed_efficiency(self): """ Test inserting a large batch with objects having primary key set @@ -188,9 +244,12 @@ class BulkCreateTests(TestCase): """ with override_settings(DEBUG=True): connection.queries_log.clear() - TwoFields.objects.bulk_create([ - TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1) - for i in range(100000, 101000)]) + TwoFields.objects.bulk_create( + [ + TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1) + for i in range(100000, 101000) + ] + ) self.assertLess(len(connection.queries), 10) def test_explicit_batch_size(self): @@ -212,7 +271,7 @@ class BulkCreateTests(TestCase): NoFields.objects.bulk_create([NoFields() for i in range(2)]) self.assertEqual(NoFields.objects.count(), 2) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_explicit_batch_size_efficiency(self): objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)] with self.assertNumQueries(2): @@ -221,50 +280,59 @@ class BulkCreateTests(TestCase): with self.assertNumQueries(1): TwoFields.objects.bulk_create(objs, len(objs)) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_explicit_batch_size_respects_max_batch_size(self): - objs = [Country(name=f'Country {i}') for i in range(1000)] - fields = ['name', 'iso_two_letter', 'description'] + objs = [Country(name=f"Country {i}") for i in range(1000)] + fields = ["name", "iso_two_letter", "description"] max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1) with self.assertNumQueries(ceil(len(objs) / max_batch_size)): Country.objects.bulk_create(objs, batch_size=max_batch_size + 1) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_bulk_insert_expressions(self): - Restaurant.objects.bulk_create([ - Restaurant(name="Sam's Shake Shack"), - Restaurant(name=Lower(Value("Betty's Beetroot Bar"))) - ]) + Restaurant.objects.bulk_create( + [ + Restaurant(name="Sam's Shake Shack"), + Restaurant(name=Lower(Value("Betty's Beetroot Bar"))), + ] + ) bbb = Restaurant.objects.filter(name="betty's beetroot bar") self.assertEqual(bbb.count(), 1) - @skipUnlessDBFeature('has_bulk_insert') + @skipUnlessDBFeature("has_bulk_insert") def test_bulk_insert_nullable_fields(self): fk_to_auto_fields = { - 'auto_field': NoFields.objects.create(), - 'small_auto_field': SmallAutoFieldModel.objects.create(), - 'big_auto_field': BigAutoFieldModel.objects.create(), + "auto_field": NoFields.objects.create(), + "small_auto_field": SmallAutoFieldModel.objects.create(), + "big_auto_field": BigAutoFieldModel.objects.create(), } # NULL can be mixed with other values in nullable fields - nullable_fields = [field for field in NullableFields._meta.get_fields() if field.name != 'id'] - NullableFields.objects.bulk_create([ - NullableFields(**{**fk_to_auto_fields, field.name: None}) - for field in nullable_fields - ]) + nullable_fields = [ + field for field in NullableFields._meta.get_fields() if field.name != "id" + ] + NullableFields.objects.bulk_create( + [ + NullableFields(**{**fk_to_auto_fields, field.name: None}) + for field in nullable_fields + ] + ) self.assertEqual(NullableFields.objects.count(), len(nullable_fields)) for field in nullable_fields: with self.subTest(field=field): - field_value = '' if isinstance(field, FileField) else None - self.assertEqual(NullableFields.objects.filter(**{field.name: field_value}).count(), 1) + field_value = "" if isinstance(field, FileField) else None + self.assertEqual( + NullableFields.objects.filter(**{field.name: field_value}).count(), + 1, + ) - @skipUnlessDBFeature('can_return_rows_from_bulk_insert') + @skipUnlessDBFeature("can_return_rows_from_bulk_insert") def test_set_pk_and_insert_single_item(self): with self.assertNumQueries(1): countries = Country.objects.bulk_create([self.data[0]]) self.assertEqual(len(countries), 1) self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0]) - @skipUnlessDBFeature('can_return_rows_from_bulk_insert') + @skipUnlessDBFeature("can_return_rows_from_bulk_insert") def test_set_pk_and_query_efficiency(self): with self.assertNumQueries(1): countries = Country.objects.bulk_create(self.data) @@ -274,10 +342,10 @@ class BulkCreateTests(TestCase): self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2]) self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3]) - @skipUnlessDBFeature('can_return_rows_from_bulk_insert') + @skipUnlessDBFeature("can_return_rows_from_bulk_insert") def test_set_state(self): - country_nl = Country(name='Netherlands', iso_two_letter='NL') - country_be = Country(name='Belgium', iso_two_letter='BE') + country_nl = Country(name="Netherlands", iso_two_letter="NL") + country_be = Country(name="Belgium", iso_two_letter="BE") Country.objects.bulk_create([country_nl]) country_be.save() # Objects save via bulk_create() and save() should have equal state. @@ -285,21 +353,21 @@ class BulkCreateTests(TestCase): self.assertEqual(country_nl._state.db, country_be._state.db) def test_set_state_with_pk_specified(self): - state_ca = State(two_letter_code='CA') - state_ny = State(two_letter_code='NY') + state_ca = State(two_letter_code="CA") + state_ny = State(two_letter_code="NY") State.objects.bulk_create([state_ca]) state_ny.save() # Objects save via bulk_create() and save() should have equal state. self.assertEqual(state_ca._state.adding, state_ny._state.adding) self.assertEqual(state_ca._state.db, state_ny._state.db) - @skipIfDBFeature('supports_ignore_conflicts') + @skipIfDBFeature("supports_ignore_conflicts") def test_ignore_conflicts_value_error(self): - message = 'This database backend does not support ignoring conflicts.' + message = "This database backend does not support ignoring conflicts." with self.assertRaisesMessage(NotSupportedError, message): TwoFields.objects.bulk_create(self.data, ignore_conflicts=True) - @skipUnlessDBFeature('supports_ignore_conflicts') + @skipUnlessDBFeature("supports_ignore_conflicts") def test_ignore_conflicts_ignore(self): data = [ TwoFields(f1=1, f2=1), @@ -320,7 +388,9 @@ class BulkCreateTests(TestCase): self.assertIsNone(conflicting_objects[1].pk) # New objects are created and conflicts are ignored. new_object = TwoFields(f1=4, f2=4) - TwoFields.objects.bulk_create(conflicting_objects + [new_object], ignore_conflicts=True) + TwoFields.objects.bulk_create( + conflicting_objects + [new_object], ignore_conflicts=True + ) self.assertEqual(TwoFields.objects.count(), 4) self.assertIsNone(new_object.pk) # Without ignore_conflicts=True, there's a problem. @@ -335,7 +405,7 @@ class BulkCreateTests(TestCase): child = NullableFields.objects.get(integer_field=88) self.assertEqual(child.auto_field, parent) - @skipUnlessDBFeature('can_return_rows_from_bulk_insert') + @skipUnlessDBFeature("can_return_rows_from_bulk_insert") def test_nullable_fk_after_parent_bulk_create(self): parent = NoFields() child = NullableFields(auto_field=parent, integer_field=88) @@ -354,19 +424,19 @@ class BulkCreateTests(TestCase): NullableFields.objects.bulk_create([NullableFields(auto_field=parent)]) def test_invalid_batch_size_exception(self): - msg = 'Batch size must be a positive integer.' + msg = "Batch size must be a positive integer." with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create([], batch_size=-1) - @skipIfDBFeature('supports_update_conflicts') + @skipIfDBFeature("supports_update_conflicts") def test_update_conflicts_unsupported(self): - msg = 'This database backend does not support updating conflicts.' + msg = "This database backend does not support updating conflicts." with self.assertRaisesMessage(NotSupportedError, msg): Country.objects.bulk_create(self.data, update_conflicts=True) - @skipUnlessDBFeature('supports_ignore_conflicts', 'supports_update_conflicts') + @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts") def test_ignore_update_conflicts_exclusive(self): - msg = 'ignore_conflicts and update_conflicts are mutually exclusive' + msg = "ignore_conflicts and update_conflicts are mutually exclusive" with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create( self.data, @@ -374,155 +444,166 @@ class BulkCreateTests(TestCase): update_conflicts=True, ) - @skipUnlessDBFeature('supports_update_conflicts') + @skipUnlessDBFeature("supports_update_conflicts") def test_update_conflicts_no_update_fields(self): msg = ( - 'Fields that will be updated when a row insertion fails on ' - 'conflicts must be provided.' + "Fields that will be updated when a row insertion fails on " + "conflicts must be provided." ) with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create(self.data, update_conflicts=True) - @skipUnlessDBFeature('supports_update_conflicts') - @skipIfDBFeature('supports_update_conflicts_with_target') + @skipUnlessDBFeature("supports_update_conflicts") + @skipIfDBFeature("supports_update_conflicts_with_target") def test_update_conflicts_unique_field_unsupported(self): msg = ( - 'This database backend does not support updating conflicts with ' - 'specifying unique fields that can trigger the upsert.' + "This database backend does not support updating conflicts with " + "specifying unique fields that can trigger the upsert." ) with self.assertRaisesMessage(NotSupportedError, msg): TwoFields.objects.bulk_create( [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], update_conflicts=True, - update_fields=['f2'], - unique_fields=['f1'], + update_fields=["f2"], + unique_fields=["f1"], ) - @skipUnlessDBFeature('supports_update_conflicts') + @skipUnlessDBFeature("supports_update_conflicts") def test_update_conflicts_nonexistent_update_fields(self): unique_fields = None if connection.features.supports_update_conflicts_with_target: - unique_fields = ['f1'] + unique_fields = ["f1"] msg = "TwoFields has no field named 'nonexistent'" with self.assertRaisesMessage(FieldDoesNotExist, msg): TwoFields.objects.bulk_create( [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], update_conflicts=True, - update_fields=['nonexistent'], + update_fields=["nonexistent"], unique_fields=unique_fields, ) @skipUnlessDBFeature( - 'supports_update_conflicts', 'supports_update_conflicts_with_target', + "supports_update_conflicts", + "supports_update_conflicts_with_target", ) def test_update_conflicts_unique_fields_required(self): - msg = 'Unique fields that can trigger the upsert must be provided.' + msg = "Unique fields that can trigger the upsert must be provided." with self.assertRaisesMessage(ValueError, msg): TwoFields.objects.bulk_create( [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], update_conflicts=True, - update_fields=['f1'], + update_fields=["f1"], ) @skipUnlessDBFeature( - 'supports_update_conflicts', 'supports_update_conflicts_with_target', + "supports_update_conflicts", + "supports_update_conflicts_with_target", ) def test_update_conflicts_invalid_update_fields(self): - msg = ( - 'bulk_create() can only be used with concrete fields in update_fields.' - ) + msg = "bulk_create() can only be used with concrete fields in update_fields." # Reverse one-to-one relationship. with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create( self.data, update_conflicts=True, - update_fields=['relatedmodel'], - unique_fields=['pk'], + update_fields=["relatedmodel"], + unique_fields=["pk"], ) # Many-to-many relationship. with self.assertRaisesMessage(ValueError, msg): RelatedModel.objects.bulk_create( [RelatedModel(country=self.data[0])], update_conflicts=True, - update_fields=['big_auto_fields'], - unique_fields=['country'], + update_fields=["big_auto_fields"], + unique_fields=["country"], ) @skipUnlessDBFeature( - 'supports_update_conflicts', 'supports_update_conflicts_with_target', + "supports_update_conflicts", + "supports_update_conflicts_with_target", ) def test_update_conflicts_pk_in_update_fields(self): - msg = 'bulk_create() cannot be used with primary keys in update_fields.' + msg = "bulk_create() cannot be used with primary keys in update_fields." with self.assertRaisesMessage(ValueError, msg): BigAutoFieldModel.objects.bulk_create( [BigAutoFieldModel()], update_conflicts=True, - update_fields=['id'], - unique_fields=['id'], + update_fields=["id"], + unique_fields=["id"], ) @skipUnlessDBFeature( - 'supports_update_conflicts', 'supports_update_conflicts_with_target', + "supports_update_conflicts", + "supports_update_conflicts_with_target", ) def test_update_conflicts_invalid_unique_fields(self): - msg = ( - 'bulk_create() can only be used with concrete fields in unique_fields.' - ) + msg = "bulk_create() can only be used with concrete fields in unique_fields." # Reverse one-to-one relationship. with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create( self.data, update_conflicts=True, - update_fields=['name'], - unique_fields=['relatedmodel'], + update_fields=["name"], + unique_fields=["relatedmodel"], ) # Many-to-many relationship. with self.assertRaisesMessage(ValueError, msg): RelatedModel.objects.bulk_create( [RelatedModel(country=self.data[0])], update_conflicts=True, - update_fields=['name'], - unique_fields=['big_auto_fields'], + update_fields=["name"], + unique_fields=["big_auto_fields"], ) def _test_update_conflicts_two_fields(self, unique_fields): - TwoFields.objects.bulk_create([ - TwoFields(f1=1, f2=1, name='a'), - TwoFields(f1=2, f2=2, name='b'), - ]) + TwoFields.objects.bulk_create( + [ + TwoFields(f1=1, f2=1, name="a"), + TwoFields(f1=2, f2=2, name="b"), + ] + ) self.assertEqual(TwoFields.objects.count(), 2) conflicting_objects = [ - TwoFields(f1=1, f2=1, name='c'), - TwoFields(f1=2, f2=2, name='d'), + TwoFields(f1=1, f2=1, name="c"), + TwoFields(f1=2, f2=2, name="d"), ] TwoFields.objects.bulk_create( conflicting_objects, update_conflicts=True, unique_fields=unique_fields, - update_fields=['name'], + update_fields=["name"], ) self.assertEqual(TwoFields.objects.count(), 2) - self.assertCountEqual(TwoFields.objects.values('f1', 'f2', 'name'), [ - {'f1': 1, 'f2': 1, 'name': 'c'}, - {'f1': 2, 'f2': 2, 'name': 'd'}, - ]) + self.assertCountEqual( + TwoFields.objects.values("f1", "f2", "name"), + [ + {"f1": 1, "f2": 1, "name": "c"}, + {"f1": 2, "f2": 2, "name": "d"}, + ], + ) - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_two_fields_unique_fields_first(self): - self._test_update_conflicts_two_fields(['f1']) + self._test_update_conflicts_two_fields(["f1"]) - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_two_fields_unique_fields_second(self): - self._test_update_conflicts_two_fields(['f2']) + self._test_update_conflicts_two_fields(["f2"]) - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_two_fields_unique_fields_both(self): with self.assertRaises((OperationalError, ProgrammingError)): - self._test_update_conflicts_two_fields(['f1', 'f2']) + self._test_update_conflicts_two_fields(["f1", "f2"]) - @skipUnlessDBFeature('supports_update_conflicts') - @skipIfDBFeature('supports_update_conflicts_with_target') + @skipUnlessDBFeature("supports_update_conflicts") + @skipIfDBFeature("supports_update_conflicts_with_target") def test_update_conflicts_two_fields_no_unique_fields(self): self._test_update_conflicts_two_fields([]) @@ -532,99 +613,129 @@ class BulkCreateTests(TestCase): new_data = [ # Conflicting countries. - Country(name='Germany', iso_two_letter='DE', description=( - 'Germany is a country in Central Europe.' - )), - Country(name='Czech Republic', iso_two_letter='CZ', description=( - 'The Czech Republic is a landlocked country in Central Europe.' - )), + Country( + name="Germany", + iso_two_letter="DE", + description=("Germany is a country in Central Europe."), + ), + Country( + name="Czech Republic", + iso_two_letter="CZ", + description=( + "The Czech Republic is a landlocked country in Central Europe." + ), + ), # New countries. - Country(name='Australia', iso_two_letter='AU'), - Country(name='Japan', iso_two_letter='JP', description=( - 'Japan is an island country in East Asia.' - )), + Country(name="Australia", iso_two_letter="AU"), + Country( + name="Japan", + iso_two_letter="JP", + description=("Japan is an island country in East Asia."), + ), ] Country.objects.bulk_create( new_data, update_conflicts=True, - update_fields=['description'], + update_fields=["description"], unique_fields=unique_fields, ) self.assertEqual(Country.objects.count(), 6) - self.assertCountEqual(Country.objects.values('iso_two_letter', 'description'), [ - {'iso_two_letter': 'US', 'description': ''}, - {'iso_two_letter': 'NL', 'description': ''}, - {'iso_two_letter': 'DE', 'description': ( - 'Germany is a country in Central Europe.' - )}, - {'iso_two_letter': 'CZ', 'description': ( - 'The Czech Republic is a landlocked country in Central Europe.' - )}, - {'iso_two_letter': 'AU', 'description': ''}, - {'iso_two_letter': 'JP', 'description': ( - 'Japan is an island country in East Asia.' - )}, - ]) - - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + self.assertCountEqual( + Country.objects.values("iso_two_letter", "description"), + [ + {"iso_two_letter": "US", "description": ""}, + {"iso_two_letter": "NL", "description": ""}, + { + "iso_two_letter": "DE", + "description": ("Germany is a country in Central Europe."), + }, + { + "iso_two_letter": "CZ", + "description": ( + "The Czech Republic is a landlocked country in Central Europe." + ), + }, + {"iso_two_letter": "AU", "description": ""}, + { + "iso_two_letter": "JP", + "description": ("Japan is an island country in East Asia."), + }, + ], + ) + + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_unique_two_fields_unique_fields_both(self): - self._test_update_conflicts_unique_two_fields(['iso_two_letter', 'name']) + self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"]) - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_unique_two_fields_unique_fields_one(self): with self.assertRaises((OperationalError, ProgrammingError)): - self._test_update_conflicts_unique_two_fields(['iso_two_letter']) + self._test_update_conflicts_unique_two_fields(["iso_two_letter"]) - @skipUnlessDBFeature('supports_update_conflicts') - @skipIfDBFeature('supports_update_conflicts_with_target') + @skipUnlessDBFeature("supports_update_conflicts") + @skipIfDBFeature("supports_update_conflicts_with_target") def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self): self._test_update_conflicts_unique_two_fields([]) def _test_update_conflicts(self, unique_fields): - UpsertConflict.objects.bulk_create([ - UpsertConflict(number=1, rank=1, name='John'), - UpsertConflict(number=2, rank=2, name='Mary'), - UpsertConflict(number=3, rank=3, name='Hannah'), - ]) + UpsertConflict.objects.bulk_create( + [ + UpsertConflict(number=1, rank=1, name="John"), + UpsertConflict(number=2, rank=2, name="Mary"), + UpsertConflict(number=3, rank=3, name="Hannah"), + ] + ) self.assertEqual(UpsertConflict.objects.count(), 3) conflicting_objects = [ - UpsertConflict(number=1, rank=4, name='Steve'), - UpsertConflict(number=2, rank=2, name='Olivia'), - UpsertConflict(number=3, rank=1, name='Hannah'), + UpsertConflict(number=1, rank=4, name="Steve"), + UpsertConflict(number=2, rank=2, name="Olivia"), + UpsertConflict(number=3, rank=1, name="Hannah"), ] UpsertConflict.objects.bulk_create( conflicting_objects, update_conflicts=True, - update_fields=['name', 'rank'], + update_fields=["name", "rank"], unique_fields=unique_fields, ) self.assertEqual(UpsertConflict.objects.count(), 3) - self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ - {'number': 1, 'rank': 4, 'name': 'Steve'}, - {'number': 2, 'rank': 2, 'name': 'Olivia'}, - {'number': 3, 'rank': 1, 'name': 'Hannah'}, - ]) + self.assertCountEqual( + UpsertConflict.objects.values("number", "rank", "name"), + [ + {"number": 1, "rank": 4, "name": "Steve"}, + {"number": 2, "rank": 2, "name": "Olivia"}, + {"number": 3, "rank": 1, "name": "Hannah"}, + ], + ) UpsertConflict.objects.bulk_create( - conflicting_objects + [UpsertConflict(number=4, rank=4, name='Mark')], + conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")], update_conflicts=True, - update_fields=['name', 'rank'], + update_fields=["name", "rank"], unique_fields=unique_fields, ) self.assertEqual(UpsertConflict.objects.count(), 4) - self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ - {'number': 1, 'rank': 4, 'name': 'Steve'}, - {'number': 2, 'rank': 2, 'name': 'Olivia'}, - {'number': 3, 'rank': 1, 'name': 'Hannah'}, - {'number': 4, 'rank': 4, 'name': 'Mark'}, - ]) - - @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + self.assertCountEqual( + UpsertConflict.objects.values("number", "rank", "name"), + [ + {"number": 1, "rank": 4, "name": "Steve"}, + {"number": 2, "rank": 2, "name": "Olivia"}, + {"number": 3, "rank": 1, "name": "Hannah"}, + {"number": 4, "rank": 4, "name": "Mark"}, + ], + ) + + @skipUnlessDBFeature( + "supports_update_conflicts", "supports_update_conflicts_with_target" + ) def test_update_conflicts_unique_fields(self): - self._test_update_conflicts(unique_fields=['number']) + self._test_update_conflicts(unique_fields=["number"]) - @skipUnlessDBFeature('supports_update_conflicts') - @skipIfDBFeature('supports_update_conflicts_with_target') + @skipUnlessDBFeature("supports_update_conflicts") + @skipIfDBFeature("supports_update_conflicts_with_target") def test_update_conflicts_no_unique_fields(self): self._test_update_conflicts([]) |