diff options
author | sean_c_hsu <s8901489@gmail.com> | 2020-06-15 00:58:06 +0800 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-01-19 20:17:42 +0100 |
commit | 0f6946495a8ec955b471ca1baaf408ceb53d4796 (patch) | |
tree | c3d867d141074fb990d3140e02fbaa9d717e8ff2 /tests/bulk_create | |
parent | ba9de2e74edb155067dc96a3369305c9ef4ba385 (diff) | |
download | django-0f6946495a8ec955b471ca1baaf408ceb53d4796.tar.gz |
Fixed #31685 -- Added support for updating conflicts to QuerySet.bulk_create().
Thanks Florian Apolloner, Chris Jerdonek, Hannes Ljungberg, Nick Pope,
and Mariusz Felisiak for reviews.
Diffstat (limited to 'tests/bulk_create')
-rw-r--r-- | tests/bulk_create/models.py | 21 | ||||
-rw-r--r-- | tests/bulk_create/tests.py | 292 |
2 files changed, 306 insertions, 7 deletions
diff --git a/tests/bulk_create/models.py b/tests/bulk_create/models.py index 586457b192..f0db69932e 100644 --- a/tests/bulk_create/models.py +++ b/tests/bulk_create/models.py @@ -16,6 +16,14 @@ class Country(models.Model): iso_two_letter = models.CharField(max_length=2) description = models.TextField() + class Meta: + constraints = [ + models.UniqueConstraint( + fields=['iso_two_letter', 'name'], + name='country_name_iso_unique', + ), + ] + class ProxyCountry(Country): class Meta: @@ -58,6 +66,13 @@ class State(models.Model): class TwoFields(models.Model): f1 = models.IntegerField(unique=True) f2 = models.IntegerField(unique=True) + name = models.CharField(max_length=15, null=True) + + +class UpsertConflict(models.Model): + number = models.IntegerField(unique=True) + rank = models.IntegerField() + name = models.CharField(max_length=15) class NoFields(models.Model): @@ -103,3 +118,9 @@ class NullableFields(models.Model): text_field = models.TextField(null=True, default='text') url_field = models.URLField(null=True, default='/') uuid_field = models.UUIDField(null=True, default=uuid.uuid4) + + +class RelatedModel(models.Model): + name = models.CharField(max_length=15, null=True) + country = models.OneToOneField(Country, models.CASCADE, primary_key=True) + big_auto_fields = models.ManyToManyField(BigAutoFieldModel) diff --git a/tests/bulk_create/tests.py b/tests/bulk_create/tests.py index 2ee54c382f..7e5ff32380 100644 --- a/tests/bulk_create/tests.py +++ b/tests/bulk_create/tests.py @@ -1,7 +1,11 @@ from math import ceil from operator import attrgetter -from django.db import IntegrityError, NotSupportedError, connection +from django.core.exceptions import FieldDoesNotExist +from django.db import ( + IntegrityError, NotSupportedError, OperationalError, ProgrammingError, + connection, +) from django.db.models import FileField, Value from django.db.models.functions import Lower from django.test import ( @@ -11,7 +15,8 @@ from django.test import ( from .models import ( BigAutoFieldModel, Country, NoFields, NullableFields, Pizzeria, ProxyCountry, ProxyMultiCountry, ProxyMultiProxyCountry, ProxyProxyCountry, - Restaurant, SmallAutoFieldModel, State, TwoFields, + RelatedModel, Restaurant, SmallAutoFieldModel, State, TwoFields, + UpsertConflict, ) @@ -53,10 +58,10 @@ class BulkCreateTests(TestCase): @skipUnlessDBFeature('has_bulk_insert') def test_long_and_short_text(self): Country.objects.bulk_create([ - Country(description='a' * 4001), - Country(description='a'), - Country(description='Ж' * 2001), - Country(description='Ж'), + Country(description='a' * 4001, iso_two_letter='A'), + Country(description='a', iso_two_letter='B'), + Country(description='Ж' * 2001, iso_two_letter='C'), + Country(description='Ж', iso_two_letter='D'), ]) self.assertEqual(Country.objects.count(), 4) @@ -218,7 +223,7 @@ class BulkCreateTests(TestCase): @skipUnlessDBFeature('has_bulk_insert') def test_explicit_batch_size_respects_max_batch_size(self): - objs = [Country() for i in range(1000)] + objs = [Country(name=f'Country {i}') for i in range(1000)] fields = ['name', 'iso_two_letter', 'description'] max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1) with self.assertNumQueries(ceil(len(objs) / max_batch_size)): @@ -352,3 +357,276 @@ class BulkCreateTests(TestCase): msg = 'Batch size must be a positive integer.' with self.assertRaisesMessage(ValueError, msg): Country.objects.bulk_create([], batch_size=-1) + + @skipIfDBFeature('supports_update_conflicts') + def test_update_conflicts_unsupported(self): + msg = 'This database backend does not support updating conflicts.' + with self.assertRaisesMessage(NotSupportedError, msg): + Country.objects.bulk_create(self.data, update_conflicts=True) + + @skipUnlessDBFeature('supports_ignore_conflicts', 'supports_update_conflicts') + def test_ignore_update_conflicts_exclusive(self): + msg = 'ignore_conflicts and update_conflicts are mutually exclusive' + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + ignore_conflicts=True, + update_conflicts=True, + ) + + @skipUnlessDBFeature('supports_update_conflicts') + def test_update_conflicts_no_update_fields(self): + msg = ( + 'Fields that will be updated when a row insertion fails on ' + 'conflicts must be provided.' + ) + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create(self.data, update_conflicts=True) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_unique_field_unsupported(self): + msg = ( + 'This database backend does not support updating conflicts with ' + 'specifying unique fields that can trigger the upsert.' + ) + with self.assertRaisesMessage(NotSupportedError, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['f2'], + unique_fields=['f1'], + ) + + @skipUnlessDBFeature('supports_update_conflicts') + def test_update_conflicts_nonexistent_update_fields(self): + unique_fields = None + if connection.features.supports_update_conflicts_with_target: + unique_fields = ['f1'] + msg = "TwoFields has no field named 'nonexistent'" + with self.assertRaisesMessage(FieldDoesNotExist, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['nonexistent'], + unique_fields=unique_fields, + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_unique_fields_required(self): + msg = 'Unique fields that can trigger the upsert must be provided.' + with self.assertRaisesMessage(ValueError, msg): + TwoFields.objects.bulk_create( + [TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)], + update_conflicts=True, + update_fields=['f1'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_invalid_update_fields(self): + msg = ( + 'bulk_create() can only be used with concrete fields in ' + 'update_fields.' + ) + # Reverse one-to-one relationship. + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + update_conflicts=True, + update_fields=['relatedmodel'], + unique_fields=['pk'], + ) + # Many-to-many relationship. + with self.assertRaisesMessage(ValueError, msg): + RelatedModel.objects.bulk_create( + [RelatedModel(country=self.data[0])], + update_conflicts=True, + update_fields=['big_auto_fields'], + unique_fields=['country'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_pk_in_update_fields(self): + msg = 'bulk_create() cannot be used with primary keys in update_fields.' + with self.assertRaisesMessage(ValueError, msg): + BigAutoFieldModel.objects.bulk_create( + [BigAutoFieldModel()], + update_conflicts=True, + update_fields=['id'], + unique_fields=['id'], + ) + + @skipUnlessDBFeature( + 'supports_update_conflicts', 'supports_update_conflicts_with_target', + ) + def test_update_conflicts_invalid_unique_fields(self): + msg = ( + 'bulk_create() can only be used with concrete fields in ' + 'unique_fields.' + ) + # Reverse one-to-one relationship. + with self.assertRaisesMessage(ValueError, msg): + Country.objects.bulk_create( + self.data, + update_conflicts=True, + update_fields=['name'], + unique_fields=['relatedmodel'], + ) + # Many-to-many relationship. + with self.assertRaisesMessage(ValueError, msg): + RelatedModel.objects.bulk_create( + [RelatedModel(country=self.data[0])], + update_conflicts=True, + update_fields=['name'], + unique_fields=['big_auto_fields'], + ) + + def _test_update_conflicts_two_fields(self, unique_fields): + TwoFields.objects.bulk_create([ + TwoFields(f1=1, f2=1, name='a'), + TwoFields(f1=2, f2=2, name='b'), + ]) + self.assertEqual(TwoFields.objects.count(), 2) + + conflicting_objects = [ + TwoFields(f1=1, f2=1, name='c'), + TwoFields(f1=2, f2=2, name='d'), + ] + TwoFields.objects.bulk_create( + conflicting_objects, + update_conflicts=True, + unique_fields=unique_fields, + update_fields=['name'], + ) + self.assertEqual(TwoFields.objects.count(), 2) + self.assertCountEqual(TwoFields.objects.values('f1', 'f2', 'name'), [ + {'f1': 1, 'f2': 1, 'name': 'c'}, + {'f1': 2, 'f2': 2, 'name': 'd'}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_first(self): + self._test_update_conflicts_two_fields(['f1']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_second(self): + self._test_update_conflicts_two_fields(['f2']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_unique_fields_both(self): + with self.assertRaises((OperationalError, ProgrammingError)): + self._test_update_conflicts_two_fields(['f1', 'f2']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_two_fields_no_unique_fields(self): + self._test_update_conflicts_two_fields([]) + + def _test_update_conflicts_unique_two_fields(self, unique_fields): + Country.objects.bulk_create(self.data) + self.assertEqual(Country.objects.count(), 4) + + new_data = [ + # Conflicting countries. + Country(name='Germany', iso_two_letter='DE', description=( + 'Germany is a country in Central Europe.' + )), + Country(name='Czech Republic', iso_two_letter='CZ', description=( + 'The Czech Republic is a landlocked country in Central Europe.' + )), + # New countries. + Country(name='Australia', iso_two_letter='AU'), + Country(name='Japan', iso_two_letter='JP', description=( + 'Japan is an island country in East Asia.' + )), + ] + Country.objects.bulk_create( + new_data, + update_conflicts=True, + update_fields=['description'], + unique_fields=unique_fields, + ) + self.assertEqual(Country.objects.count(), 6) + self.assertCountEqual(Country.objects.values('iso_two_letter', 'description'), [ + {'iso_two_letter': 'US', 'description': ''}, + {'iso_two_letter': 'NL', 'description': ''}, + {'iso_two_letter': 'DE', 'description': ( + 'Germany is a country in Central Europe.' + )}, + {'iso_two_letter': 'CZ', 'description': ( + 'The Czech Republic is a landlocked country in Central Europe.' + )}, + {'iso_two_letter': 'AU', 'description': ''}, + {'iso_two_letter': 'JP', 'description': ( + 'Japan is an island country in East Asia.' + )}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_fields_both(self): + self._test_update_conflicts_unique_two_fields(['iso_two_letter', 'name']) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_fields_one(self): + with self.assertRaises((OperationalError, ProgrammingError)): + self._test_update_conflicts_unique_two_fields(['iso_two_letter']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self): + self._test_update_conflicts_unique_two_fields([]) + + def _test_update_conflicts(self, unique_fields): + UpsertConflict.objects.bulk_create([ + UpsertConflict(number=1, rank=1, name='John'), + UpsertConflict(number=2, rank=2, name='Mary'), + UpsertConflict(number=3, rank=3, name='Hannah'), + ]) + self.assertEqual(UpsertConflict.objects.count(), 3) + + conflicting_objects = [ + UpsertConflict(number=1, rank=4, name='Steve'), + UpsertConflict(number=2, rank=2, name='Olivia'), + UpsertConflict(number=3, rank=1, name='Hannah'), + ] + UpsertConflict.objects.bulk_create( + conflicting_objects, + update_conflicts=True, + update_fields=['name', 'rank'], + unique_fields=unique_fields, + ) + self.assertEqual(UpsertConflict.objects.count(), 3) + self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ + {'number': 1, 'rank': 4, 'name': 'Steve'}, + {'number': 2, 'rank': 2, 'name': 'Olivia'}, + {'number': 3, 'rank': 1, 'name': 'Hannah'}, + ]) + + UpsertConflict.objects.bulk_create( + conflicting_objects + [UpsertConflict(number=4, rank=4, name='Mark')], + update_conflicts=True, + update_fields=['name', 'rank'], + unique_fields=unique_fields, + ) + self.assertEqual(UpsertConflict.objects.count(), 4) + self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [ + {'number': 1, 'rank': 4, 'name': 'Steve'}, + {'number': 2, 'rank': 2, 'name': 'Olivia'}, + {'number': 3, 'rank': 1, 'name': 'Hannah'}, + {'number': 4, 'rank': 4, 'name': 'Mark'}, + ]) + + @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target') + def test_update_conflicts_unique_fields(self): + self._test_update_conflicts(unique_fields=['number']) + + @skipUnlessDBFeature('supports_update_conflicts') + @skipIfDBFeature('supports_update_conflicts_with_target') + def test_update_conflicts_no_unique_fields(self): + self._test_update_conflicts([]) |