summaryrefslogtreecommitdiff
path: root/tests/bulk_create
diff options
context:
space:
mode:
authordjango-bot <ops@djangoproject.com>2022-02-03 20:24:19 +0100
committerMariusz Felisiak <felisiak.mariusz@gmail.com>2022-02-07 20:37:05 +0100
commit9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch)
treef0506b668a013d0063e5fba3dbf4863b466713ba /tests/bulk_create
parentf68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff)
downloaddjango-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/bulk_create')
-rw-r--r--tests/bulk_create/models.py46
-rw-r--r--tests/bulk_create/tests.py585
2 files changed, 377 insertions, 254 deletions
diff --git a/tests/bulk_create/models.py b/tests/bulk_create/models.py
index f0db69932e..27abc416bd 100644
--- a/tests/bulk_create/models.py
+++ b/tests/bulk_create/models.py
@@ -19,8 +19,8 @@ class Country(models.Model):
class Meta:
constraints = [
models.UniqueConstraint(
- fields=['iso_two_letter', 'name'],
- name='country_name_iso_unique',
+ fields=["iso_two_letter", "name"],
+ name="country_name_iso_unique",
),
]
@@ -90,33 +90,45 @@ class BigAutoFieldModel(models.Model):
class NullableFields(models.Model):
# Fields in db.backends.oracle.BulkInsertMapper
big_int_filed = models.BigIntegerField(null=True, default=1)
- binary_field = models.BinaryField(null=True, default=b'data')
+ binary_field = models.BinaryField(null=True, default=b"data")
date_field = models.DateField(null=True, default=timezone.now)
datetime_field = models.DateTimeField(null=True, default=timezone.now)
- decimal_field = models.DecimalField(null=True, max_digits=2, decimal_places=1, default=Decimal('1.1'))
+ decimal_field = models.DecimalField(
+ null=True, max_digits=2, decimal_places=1, default=Decimal("1.1")
+ )
duration_field = models.DurationField(null=True, default=datetime.timedelta(1))
float_field = models.FloatField(null=True, default=3.2)
integer_field = models.IntegerField(null=True, default=2)
null_boolean_field = models.BooleanField(null=True, default=False)
- positive_big_integer_field = models.PositiveBigIntegerField(null=True, default=2 ** 63 - 1)
+ positive_big_integer_field = models.PositiveBigIntegerField(
+ null=True, default=2**63 - 1
+ )
positive_integer_field = models.PositiveIntegerField(null=True, default=3)
- positive_small_integer_field = models.PositiveSmallIntegerField(null=True, default=4)
+ positive_small_integer_field = models.PositiveSmallIntegerField(
+ null=True, default=4
+ )
small_integer_field = models.SmallIntegerField(null=True, default=5)
time_field = models.TimeField(null=True, default=timezone.now)
auto_field = models.ForeignKey(NoFields, on_delete=models.CASCADE, null=True)
- small_auto_field = models.ForeignKey(SmallAutoFieldModel, on_delete=models.CASCADE, null=True)
- big_auto_field = models.ForeignKey(BigAutoFieldModel, on_delete=models.CASCADE, null=True)
+ small_auto_field = models.ForeignKey(
+ SmallAutoFieldModel, on_delete=models.CASCADE, null=True
+ )
+ big_auto_field = models.ForeignKey(
+ BigAutoFieldModel, on_delete=models.CASCADE, null=True
+ )
# Fields not required in BulkInsertMapper
- char_field = models.CharField(null=True, max_length=4, default='char')
- email_field = models.EmailField(null=True, default='user@example.com')
- file_field = models.FileField(null=True, default='file.txt')
- file_path_field = models.FilePathField(path='/tmp', null=True, default='file.txt')
- generic_ip_address_field = models.GenericIPAddressField(null=True, default='127.0.0.1')
+ char_field = models.CharField(null=True, max_length=4, default="char")
+ email_field = models.EmailField(null=True, default="user@example.com")
+ file_field = models.FileField(null=True, default="file.txt")
+ file_path_field = models.FilePathField(path="/tmp", null=True, default="file.txt")
+ generic_ip_address_field = models.GenericIPAddressField(
+ null=True, default="127.0.0.1"
+ )
if Image:
- image_field = models.ImageField(null=True, default='image.jpg')
- slug_field = models.SlugField(null=True, default='slug')
- text_field = models.TextField(null=True, default='text')
- url_field = models.URLField(null=True, default='/')
+ image_field = models.ImageField(null=True, default="image.jpg")
+ slug_field = models.SlugField(null=True, default="slug")
+ text_field = models.TextField(null=True, default="text")
+ url_field = models.URLField(null=True, default="/")
uuid_field = models.UUIDField(null=True, default=uuid.uuid4)
diff --git a/tests/bulk_create/tests.py b/tests/bulk_create/tests.py
index 8040f16dcf..a01e50f4cb 100644
--- a/tests/bulk_create/tests.py
+++ b/tests/bulk_create/tests.py
@@ -3,19 +3,36 @@ from operator import attrgetter
from django.core.exceptions import FieldDoesNotExist
from django.db import (
- IntegrityError, NotSupportedError, OperationalError, ProgrammingError,
+ IntegrityError,
+ NotSupportedError,
+ OperationalError,
+ ProgrammingError,
connection,
)
from django.db.models import FileField, Value
from django.db.models.functions import Lower
from django.test import (
- TestCase, override_settings, skipIfDBFeature, skipUnlessDBFeature,
+ TestCase,
+ override_settings,
+ skipIfDBFeature,
+ skipUnlessDBFeature,
)
from .models import (
- BigAutoFieldModel, Country, NoFields, NullableFields, Pizzeria,
- ProxyCountry, ProxyMultiCountry, ProxyMultiProxyCountry, ProxyProxyCountry,
- RelatedModel, Restaurant, SmallAutoFieldModel, State, TwoFields,
+ BigAutoFieldModel,
+ Country,
+ NoFields,
+ NullableFields,
+ Pizzeria,
+ ProxyCountry,
+ ProxyMultiCountry,
+ ProxyMultiProxyCountry,
+ ProxyProxyCountry,
+ RelatedModel,
+ Restaurant,
+ SmallAutoFieldModel,
+ State,
+ TwoFields,
UpsertConflict,
)
@@ -26,142 +43,179 @@ class BulkCreateTests(TestCase):
Country(name="United States of America", iso_two_letter="US"),
Country(name="The Netherlands", iso_two_letter="NL"),
Country(name="Germany", iso_two_letter="DE"),
- Country(name="Czech Republic", iso_two_letter="CZ")
+ Country(name="Czech Republic", iso_two_letter="CZ"),
]
def test_simple(self):
created = Country.objects.bulk_create(self.data)
self.assertEqual(created, self.data)
- self.assertQuerysetEqual(Country.objects.order_by("-name"), [
- "United States of America", "The Netherlands", "Germany", "Czech Republic"
- ], attrgetter("name"))
+ self.assertQuerysetEqual(
+ Country.objects.order_by("-name"),
+ [
+ "United States of America",
+ "The Netherlands",
+ "Germany",
+ "Czech Republic",
+ ],
+ attrgetter("name"),
+ )
created = Country.objects.bulk_create([])
self.assertEqual(created, [])
self.assertEqual(Country.objects.count(), 4)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_efficiency(self):
with self.assertNumQueries(1):
Country.objects.bulk_create(self.data)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_long_non_ascii_text(self):
"""
Inserting non-ASCII values with a length in the range 2001 to 4000
characters, i.e. 4002 to 8000 bytes, must be set as a CLOB on Oracle
(#22144).
"""
- Country.objects.bulk_create([Country(description='Ж' * 3000)])
+ Country.objects.bulk_create([Country(description="Ж" * 3000)])
self.assertEqual(Country.objects.count(), 1)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_long_and_short_text(self):
- Country.objects.bulk_create([
- Country(description='a' * 4001, iso_two_letter='A'),
- Country(description='a', iso_two_letter='B'),
- Country(description='Ж' * 2001, iso_two_letter='C'),
- Country(description='Ж', iso_two_letter='D'),
- ])
+ Country.objects.bulk_create(
+ [
+ Country(description="a" * 4001, iso_two_letter="A"),
+ Country(description="a", iso_two_letter="B"),
+ Country(description="Ж" * 2001, iso_two_letter="C"),
+ Country(description="Ж", iso_two_letter="D"),
+ ]
+ )
self.assertEqual(Country.objects.count(), 4)
def test_multi_table_inheritance_unsupported(self):
expected_message = "Can't bulk create a multi-table inherited model"
with self.assertRaisesMessage(ValueError, expected_message):
- Pizzeria.objects.bulk_create([
- Pizzeria(name="The Art of Pizza"),
- ])
+ Pizzeria.objects.bulk_create(
+ [
+ Pizzeria(name="The Art of Pizza"),
+ ]
+ )
with self.assertRaisesMessage(ValueError, expected_message):
- ProxyMultiCountry.objects.bulk_create([
- ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
- ])
+ ProxyMultiCountry.objects.bulk_create(
+ [
+ ProxyMultiCountry(name="Fillory", iso_two_letter="FL"),
+ ]
+ )
with self.assertRaisesMessage(ValueError, expected_message):
- ProxyMultiProxyCountry.objects.bulk_create([
- ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
- ])
+ ProxyMultiProxyCountry.objects.bulk_create(
+ [
+ ProxyMultiProxyCountry(name="Fillory", iso_two_letter="FL"),
+ ]
+ )
def test_proxy_inheritance_supported(self):
- ProxyCountry.objects.bulk_create([
- ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
- Country(name="Tortall", iso_two_letter="TA"),
- ])
- self.assertQuerysetEqual(ProxyCountry.objects.all(), {
- "Qwghlm", "Tortall"
- }, attrgetter("name"), ordered=False)
-
- ProxyProxyCountry.objects.bulk_create([
- ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
- ])
- self.assertQuerysetEqual(ProxyProxyCountry.objects.all(), {
- "Qwghlm", "Tortall", "Netherlands",
- }, attrgetter("name"), ordered=False)
+ ProxyCountry.objects.bulk_create(
+ [
+ ProxyCountry(name="Qwghlm", iso_two_letter="QW"),
+ Country(name="Tortall", iso_two_letter="TA"),
+ ]
+ )
+ self.assertQuerysetEqual(
+ ProxyCountry.objects.all(),
+ {"Qwghlm", "Tortall"},
+ attrgetter("name"),
+ ordered=False,
+ )
+
+ ProxyProxyCountry.objects.bulk_create(
+ [
+ ProxyProxyCountry(name="Netherlands", iso_two_letter="NT"),
+ ]
+ )
+ self.assertQuerysetEqual(
+ ProxyProxyCountry.objects.all(),
+ {
+ "Qwghlm",
+ "Tortall",
+ "Netherlands",
+ },
+ attrgetter("name"),
+ ordered=False,
+ )
def test_non_auto_increment_pk(self):
- State.objects.bulk_create([
- State(two_letter_code=s)
- for s in ["IL", "NY", "CA", "ME"]
- ])
- self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [
- "CA", "IL", "ME", "NY",
- ], attrgetter("two_letter_code"))
-
- @skipUnlessDBFeature('has_bulk_insert')
+ State.objects.bulk_create(
+ [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
+ )
+ self.assertQuerysetEqual(
+ State.objects.order_by("two_letter_code"),
+ [
+ "CA",
+ "IL",
+ "ME",
+ "NY",
+ ],
+ attrgetter("two_letter_code"),
+ )
+
+ @skipUnlessDBFeature("has_bulk_insert")
def test_non_auto_increment_pk_efficiency(self):
with self.assertNumQueries(1):
- State.objects.bulk_create([
- State(two_letter_code=s)
- for s in ["IL", "NY", "CA", "ME"]
- ])
- self.assertQuerysetEqual(State.objects.order_by("two_letter_code"), [
- "CA", "IL", "ME", "NY",
- ], attrgetter("two_letter_code"))
-
- @skipIfDBFeature('allows_auto_pk_0')
+ State.objects.bulk_create(
+ [State(two_letter_code=s) for s in ["IL", "NY", "CA", "ME"]]
+ )
+ self.assertQuerysetEqual(
+ State.objects.order_by("two_letter_code"),
+ [
+ "CA",
+ "IL",
+ "ME",
+ "NY",
+ ],
+ attrgetter("two_letter_code"),
+ )
+
+ @skipIfDBFeature("allows_auto_pk_0")
def test_zero_as_autoval(self):
"""
Zero as id for AutoField should raise exception in MySQL, because MySQL
does not allow zero for automatic primary key if the
NO_AUTO_VALUE_ON_ZERO SQL mode is not enabled.
"""
- valid_country = Country(name='Germany', iso_two_letter='DE')
- invalid_country = Country(id=0, name='Poland', iso_two_letter='PL')
- msg = 'The database backend does not accept 0 as a value for AutoField.'
+ valid_country = Country(name="Germany", iso_two_letter="DE")
+ invalid_country = Country(id=0, name="Poland", iso_two_letter="PL")
+ msg = "The database backend does not accept 0 as a value for AutoField."
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create([valid_country, invalid_country])
def test_batch_same_vals(self):
# SQLite had a problem where all the same-valued models were
# collapsed to one insert.
- Restaurant.objects.bulk_create([
- Restaurant(name='foo') for i in range(0, 2)
- ])
+ Restaurant.objects.bulk_create([Restaurant(name="foo") for i in range(0, 2)])
self.assertEqual(Restaurant.objects.count(), 2)
def test_large_batch(self):
- TwoFields.objects.bulk_create([
- TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)
- ])
+ TwoFields.objects.bulk_create(
+ [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
+ )
self.assertEqual(TwoFields.objects.count(), 1001)
self.assertEqual(
- TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(),
- 101)
+ TwoFields.objects.filter(f1__gte=450, f1__lte=550).count(), 101
+ )
self.assertEqual(TwoFields.objects.filter(f2__gte=901).count(), 101)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_large_single_field_batch(self):
# SQLite had a problem with more than 500 UNIONed selects in single
# query.
- Restaurant.objects.bulk_create([
- Restaurant() for i in range(0, 501)
- ])
+ Restaurant.objects.bulk_create([Restaurant() for i in range(0, 501)])
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_large_batch_efficiency(self):
with override_settings(DEBUG=True):
connection.queries_log.clear()
- TwoFields.objects.bulk_create([
- TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)
- ])
+ TwoFields.objects.bulk_create(
+ [TwoFields(f1=i, f2=i + 1) for i in range(0, 1001)]
+ )
self.assertLess(len(connection.queries), 10)
def test_large_batch_mixed(self):
@@ -169,10 +223,12 @@ class BulkCreateTests(TestCase):
Test inserting a large batch with objects having primary key set
mixed together with objects without PK set.
"""
- TwoFields.objects.bulk_create([
- TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
- for i in range(100000, 101000)
- ])
+ TwoFields.objects.bulk_create(
+ [
+ TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
+ for i in range(100000, 101000)
+ ]
+ )
self.assertEqual(TwoFields.objects.count(), 1000)
# We can't assume much about the ID's created, except that the above
# created IDs must exist.
@@ -180,7 +236,7 @@ class BulkCreateTests(TestCase):
self.assertEqual(TwoFields.objects.filter(id__in=id_range).count(), 500)
self.assertEqual(TwoFields.objects.exclude(id__in=id_range).count(), 500)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_large_batch_mixed_efficiency(self):
"""
Test inserting a large batch with objects having primary key set
@@ -188,9 +244,12 @@ class BulkCreateTests(TestCase):
"""
with override_settings(DEBUG=True):
connection.queries_log.clear()
- TwoFields.objects.bulk_create([
- TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
- for i in range(100000, 101000)])
+ TwoFields.objects.bulk_create(
+ [
+ TwoFields(id=i if i % 2 == 0 else None, f1=i, f2=i + 1)
+ for i in range(100000, 101000)
+ ]
+ )
self.assertLess(len(connection.queries), 10)
def test_explicit_batch_size(self):
@@ -212,7 +271,7 @@ class BulkCreateTests(TestCase):
NoFields.objects.bulk_create([NoFields() for i in range(2)])
self.assertEqual(NoFields.objects.count(), 2)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_explicit_batch_size_efficiency(self):
objs = [TwoFields(f1=i, f2=i) for i in range(0, 100)]
with self.assertNumQueries(2):
@@ -221,50 +280,59 @@ class BulkCreateTests(TestCase):
with self.assertNumQueries(1):
TwoFields.objects.bulk_create(objs, len(objs))
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_explicit_batch_size_respects_max_batch_size(self):
- objs = [Country(name=f'Country {i}') for i in range(1000)]
- fields = ['name', 'iso_two_letter', 'description']
+ objs = [Country(name=f"Country {i}") for i in range(1000)]
+ fields = ["name", "iso_two_letter", "description"]
max_batch_size = max(connection.ops.bulk_batch_size(fields, objs), 1)
with self.assertNumQueries(ceil(len(objs) / max_batch_size)):
Country.objects.bulk_create(objs, batch_size=max_batch_size + 1)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_bulk_insert_expressions(self):
- Restaurant.objects.bulk_create([
- Restaurant(name="Sam's Shake Shack"),
- Restaurant(name=Lower(Value("Betty's Beetroot Bar")))
- ])
+ Restaurant.objects.bulk_create(
+ [
+ Restaurant(name="Sam's Shake Shack"),
+ Restaurant(name=Lower(Value("Betty's Beetroot Bar"))),
+ ]
+ )
bbb = Restaurant.objects.filter(name="betty's beetroot bar")
self.assertEqual(bbb.count(), 1)
- @skipUnlessDBFeature('has_bulk_insert')
+ @skipUnlessDBFeature("has_bulk_insert")
def test_bulk_insert_nullable_fields(self):
fk_to_auto_fields = {
- 'auto_field': NoFields.objects.create(),
- 'small_auto_field': SmallAutoFieldModel.objects.create(),
- 'big_auto_field': BigAutoFieldModel.objects.create(),
+ "auto_field": NoFields.objects.create(),
+ "small_auto_field": SmallAutoFieldModel.objects.create(),
+ "big_auto_field": BigAutoFieldModel.objects.create(),
}
# NULL can be mixed with other values in nullable fields
- nullable_fields = [field for field in NullableFields._meta.get_fields() if field.name != 'id']
- NullableFields.objects.bulk_create([
- NullableFields(**{**fk_to_auto_fields, field.name: None})
- for field in nullable_fields
- ])
+ nullable_fields = [
+ field for field in NullableFields._meta.get_fields() if field.name != "id"
+ ]
+ NullableFields.objects.bulk_create(
+ [
+ NullableFields(**{**fk_to_auto_fields, field.name: None})
+ for field in nullable_fields
+ ]
+ )
self.assertEqual(NullableFields.objects.count(), len(nullable_fields))
for field in nullable_fields:
with self.subTest(field=field):
- field_value = '' if isinstance(field, FileField) else None
- self.assertEqual(NullableFields.objects.filter(**{field.name: field_value}).count(), 1)
+ field_value = "" if isinstance(field, FileField) else None
+ self.assertEqual(
+ NullableFields.objects.filter(**{field.name: field_value}).count(),
+ 1,
+ )
- @skipUnlessDBFeature('can_return_rows_from_bulk_insert')
+ @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_pk_and_insert_single_item(self):
with self.assertNumQueries(1):
countries = Country.objects.bulk_create([self.data[0]])
self.assertEqual(len(countries), 1)
self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
- @skipUnlessDBFeature('can_return_rows_from_bulk_insert')
+ @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_pk_and_query_efficiency(self):
with self.assertNumQueries(1):
countries = Country.objects.bulk_create(self.data)
@@ -274,10 +342,10 @@ class BulkCreateTests(TestCase):
self.assertEqual(Country.objects.get(pk=countries[2].pk), countries[2])
self.assertEqual(Country.objects.get(pk=countries[3].pk), countries[3])
- @skipUnlessDBFeature('can_return_rows_from_bulk_insert')
+ @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_set_state(self):
- country_nl = Country(name='Netherlands', iso_two_letter='NL')
- country_be = Country(name='Belgium', iso_two_letter='BE')
+ country_nl = Country(name="Netherlands", iso_two_letter="NL")
+ country_be = Country(name="Belgium", iso_two_letter="BE")
Country.objects.bulk_create([country_nl])
country_be.save()
# Objects save via bulk_create() and save() should have equal state.
@@ -285,21 +353,21 @@ class BulkCreateTests(TestCase):
self.assertEqual(country_nl._state.db, country_be._state.db)
def test_set_state_with_pk_specified(self):
- state_ca = State(two_letter_code='CA')
- state_ny = State(two_letter_code='NY')
+ state_ca = State(two_letter_code="CA")
+ state_ny = State(two_letter_code="NY")
State.objects.bulk_create([state_ca])
state_ny.save()
# Objects save via bulk_create() and save() should have equal state.
self.assertEqual(state_ca._state.adding, state_ny._state.adding)
self.assertEqual(state_ca._state.db, state_ny._state.db)
- @skipIfDBFeature('supports_ignore_conflicts')
+ @skipIfDBFeature("supports_ignore_conflicts")
def test_ignore_conflicts_value_error(self):
- message = 'This database backend does not support ignoring conflicts.'
+ message = "This database backend does not support ignoring conflicts."
with self.assertRaisesMessage(NotSupportedError, message):
TwoFields.objects.bulk_create(self.data, ignore_conflicts=True)
- @skipUnlessDBFeature('supports_ignore_conflicts')
+ @skipUnlessDBFeature("supports_ignore_conflicts")
def test_ignore_conflicts_ignore(self):
data = [
TwoFields(f1=1, f2=1),
@@ -320,7 +388,9 @@ class BulkCreateTests(TestCase):
self.assertIsNone(conflicting_objects[1].pk)
# New objects are created and conflicts are ignored.
new_object = TwoFields(f1=4, f2=4)
- TwoFields.objects.bulk_create(conflicting_objects + [new_object], ignore_conflicts=True)
+ TwoFields.objects.bulk_create(
+ conflicting_objects + [new_object], ignore_conflicts=True
+ )
self.assertEqual(TwoFields.objects.count(), 4)
self.assertIsNone(new_object.pk)
# Without ignore_conflicts=True, there's a problem.
@@ -335,7 +405,7 @@ class BulkCreateTests(TestCase):
child = NullableFields.objects.get(integer_field=88)
self.assertEqual(child.auto_field, parent)
- @skipUnlessDBFeature('can_return_rows_from_bulk_insert')
+ @skipUnlessDBFeature("can_return_rows_from_bulk_insert")
def test_nullable_fk_after_parent_bulk_create(self):
parent = NoFields()
child = NullableFields(auto_field=parent, integer_field=88)
@@ -354,19 +424,19 @@ class BulkCreateTests(TestCase):
NullableFields.objects.bulk_create([NullableFields(auto_field=parent)])
def test_invalid_batch_size_exception(self):
- msg = 'Batch size must be a positive integer.'
+ msg = "Batch size must be a positive integer."
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create([], batch_size=-1)
- @skipIfDBFeature('supports_update_conflicts')
+ @skipIfDBFeature("supports_update_conflicts")
def test_update_conflicts_unsupported(self):
- msg = 'This database backend does not support updating conflicts.'
+ msg = "This database backend does not support updating conflicts."
with self.assertRaisesMessage(NotSupportedError, msg):
Country.objects.bulk_create(self.data, update_conflicts=True)
- @skipUnlessDBFeature('supports_ignore_conflicts', 'supports_update_conflicts')
+ @skipUnlessDBFeature("supports_ignore_conflicts", "supports_update_conflicts")
def test_ignore_update_conflicts_exclusive(self):
- msg = 'ignore_conflicts and update_conflicts are mutually exclusive'
+ msg = "ignore_conflicts and update_conflicts are mutually exclusive"
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
@@ -374,155 +444,166 @@ class BulkCreateTests(TestCase):
update_conflicts=True,
)
- @skipUnlessDBFeature('supports_update_conflicts')
+ @skipUnlessDBFeature("supports_update_conflicts")
def test_update_conflicts_no_update_fields(self):
msg = (
- 'Fields that will be updated when a row insertion fails on '
- 'conflicts must be provided.'
+ "Fields that will be updated when a row insertion fails on "
+ "conflicts must be provided."
)
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(self.data, update_conflicts=True)
- @skipUnlessDBFeature('supports_update_conflicts')
- @skipIfDBFeature('supports_update_conflicts_with_target')
+ @skipUnlessDBFeature("supports_update_conflicts")
+ @skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_unique_field_unsupported(self):
msg = (
- 'This database backend does not support updating conflicts with '
- 'specifying unique fields that can trigger the upsert.'
+ "This database backend does not support updating conflicts with "
+ "specifying unique fields that can trigger the upsert."
)
with self.assertRaisesMessage(NotSupportedError, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
- update_fields=['f2'],
- unique_fields=['f1'],
+ update_fields=["f2"],
+ unique_fields=["f1"],
)
- @skipUnlessDBFeature('supports_update_conflicts')
+ @skipUnlessDBFeature("supports_update_conflicts")
def test_update_conflicts_nonexistent_update_fields(self):
unique_fields = None
if connection.features.supports_update_conflicts_with_target:
- unique_fields = ['f1']
+ unique_fields = ["f1"]
msg = "TwoFields has no field named 'nonexistent'"
with self.assertRaisesMessage(FieldDoesNotExist, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
- update_fields=['nonexistent'],
+ update_fields=["nonexistent"],
unique_fields=unique_fields,
)
@skipUnlessDBFeature(
- 'supports_update_conflicts', 'supports_update_conflicts_with_target',
+ "supports_update_conflicts",
+ "supports_update_conflicts_with_target",
)
def test_update_conflicts_unique_fields_required(self):
- msg = 'Unique fields that can trigger the upsert must be provided.'
+ msg = "Unique fields that can trigger the upsert must be provided."
with self.assertRaisesMessage(ValueError, msg):
TwoFields.objects.bulk_create(
[TwoFields(f1=1, f2=1), TwoFields(f1=2, f2=2)],
update_conflicts=True,
- update_fields=['f1'],
+ update_fields=["f1"],
)
@skipUnlessDBFeature(
- 'supports_update_conflicts', 'supports_update_conflicts_with_target',
+ "supports_update_conflicts",
+ "supports_update_conflicts_with_target",
)
def test_update_conflicts_invalid_update_fields(self):
- msg = (
- 'bulk_create() can only be used with concrete fields in update_fields.'
- )
+ msg = "bulk_create() can only be used with concrete fields in update_fields."
# Reverse one-to-one relationship.
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
update_conflicts=True,
- update_fields=['relatedmodel'],
- unique_fields=['pk'],
+ update_fields=["relatedmodel"],
+ unique_fields=["pk"],
)
# Many-to-many relationship.
with self.assertRaisesMessage(ValueError, msg):
RelatedModel.objects.bulk_create(
[RelatedModel(country=self.data[0])],
update_conflicts=True,
- update_fields=['big_auto_fields'],
- unique_fields=['country'],
+ update_fields=["big_auto_fields"],
+ unique_fields=["country"],
)
@skipUnlessDBFeature(
- 'supports_update_conflicts', 'supports_update_conflicts_with_target',
+ "supports_update_conflicts",
+ "supports_update_conflicts_with_target",
)
def test_update_conflicts_pk_in_update_fields(self):
- msg = 'bulk_create() cannot be used with primary keys in update_fields.'
+ msg = "bulk_create() cannot be used with primary keys in update_fields."
with self.assertRaisesMessage(ValueError, msg):
BigAutoFieldModel.objects.bulk_create(
[BigAutoFieldModel()],
update_conflicts=True,
- update_fields=['id'],
- unique_fields=['id'],
+ update_fields=["id"],
+ unique_fields=["id"],
)
@skipUnlessDBFeature(
- 'supports_update_conflicts', 'supports_update_conflicts_with_target',
+ "supports_update_conflicts",
+ "supports_update_conflicts_with_target",
)
def test_update_conflicts_invalid_unique_fields(self):
- msg = (
- 'bulk_create() can only be used with concrete fields in unique_fields.'
- )
+ msg = "bulk_create() can only be used with concrete fields in unique_fields."
# Reverse one-to-one relationship.
with self.assertRaisesMessage(ValueError, msg):
Country.objects.bulk_create(
self.data,
update_conflicts=True,
- update_fields=['name'],
- unique_fields=['relatedmodel'],
+ update_fields=["name"],
+ unique_fields=["relatedmodel"],
)
# Many-to-many relationship.
with self.assertRaisesMessage(ValueError, msg):
RelatedModel.objects.bulk_create(
[RelatedModel(country=self.data[0])],
update_conflicts=True,
- update_fields=['name'],
- unique_fields=['big_auto_fields'],
+ update_fields=["name"],
+ unique_fields=["big_auto_fields"],
)
def _test_update_conflicts_two_fields(self, unique_fields):
- TwoFields.objects.bulk_create([
- TwoFields(f1=1, f2=1, name='a'),
- TwoFields(f1=2, f2=2, name='b'),
- ])
+ TwoFields.objects.bulk_create(
+ [
+ TwoFields(f1=1, f2=1, name="a"),
+ TwoFields(f1=2, f2=2, name="b"),
+ ]
+ )
self.assertEqual(TwoFields.objects.count(), 2)
conflicting_objects = [
- TwoFields(f1=1, f2=1, name='c'),
- TwoFields(f1=2, f2=2, name='d'),
+ TwoFields(f1=1, f2=1, name="c"),
+ TwoFields(f1=2, f2=2, name="d"),
]
TwoFields.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
unique_fields=unique_fields,
- update_fields=['name'],
+ update_fields=["name"],
)
self.assertEqual(TwoFields.objects.count(), 2)
- self.assertCountEqual(TwoFields.objects.values('f1', 'f2', 'name'), [
- {'f1': 1, 'f2': 1, 'name': 'c'},
- {'f1': 2, 'f2': 2, 'name': 'd'},
- ])
+ self.assertCountEqual(
+ TwoFields.objects.values("f1", "f2", "name"),
+ [
+ {"f1": 1, "f2": 1, "name": "c"},
+ {"f1": 2, "f2": 2, "name": "d"},
+ ],
+ )
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_two_fields_unique_fields_first(self):
- self._test_update_conflicts_two_fields(['f1'])
+ self._test_update_conflicts_two_fields(["f1"])
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_two_fields_unique_fields_second(self):
- self._test_update_conflicts_two_fields(['f2'])
+ self._test_update_conflicts_two_fields(["f2"])
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_two_fields_unique_fields_both(self):
with self.assertRaises((OperationalError, ProgrammingError)):
- self._test_update_conflicts_two_fields(['f1', 'f2'])
+ self._test_update_conflicts_two_fields(["f1", "f2"])
- @skipUnlessDBFeature('supports_update_conflicts')
- @skipIfDBFeature('supports_update_conflicts_with_target')
+ @skipUnlessDBFeature("supports_update_conflicts")
+ @skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_two_fields_no_unique_fields(self):
self._test_update_conflicts_two_fields([])
@@ -532,99 +613,129 @@ class BulkCreateTests(TestCase):
new_data = [
# Conflicting countries.
- Country(name='Germany', iso_two_letter='DE', description=(
- 'Germany is a country in Central Europe.'
- )),
- Country(name='Czech Republic', iso_two_letter='CZ', description=(
- 'The Czech Republic is a landlocked country in Central Europe.'
- )),
+ Country(
+ name="Germany",
+ iso_two_letter="DE",
+ description=("Germany is a country in Central Europe."),
+ ),
+ Country(
+ name="Czech Republic",
+ iso_two_letter="CZ",
+ description=(
+ "The Czech Republic is a landlocked country in Central Europe."
+ ),
+ ),
# New countries.
- Country(name='Australia', iso_two_letter='AU'),
- Country(name='Japan', iso_two_letter='JP', description=(
- 'Japan is an island country in East Asia.'
- )),
+ Country(name="Australia", iso_two_letter="AU"),
+ Country(
+ name="Japan",
+ iso_two_letter="JP",
+ description=("Japan is an island country in East Asia."),
+ ),
]
Country.objects.bulk_create(
new_data,
update_conflicts=True,
- update_fields=['description'],
+ update_fields=["description"],
unique_fields=unique_fields,
)
self.assertEqual(Country.objects.count(), 6)
- self.assertCountEqual(Country.objects.values('iso_two_letter', 'description'), [
- {'iso_two_letter': 'US', 'description': ''},
- {'iso_two_letter': 'NL', 'description': ''},
- {'iso_two_letter': 'DE', 'description': (
- 'Germany is a country in Central Europe.'
- )},
- {'iso_two_letter': 'CZ', 'description': (
- 'The Czech Republic is a landlocked country in Central Europe.'
- )},
- {'iso_two_letter': 'AU', 'description': ''},
- {'iso_two_letter': 'JP', 'description': (
- 'Japan is an island country in East Asia.'
- )},
- ])
-
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ self.assertCountEqual(
+ Country.objects.values("iso_two_letter", "description"),
+ [
+ {"iso_two_letter": "US", "description": ""},
+ {"iso_two_letter": "NL", "description": ""},
+ {
+ "iso_two_letter": "DE",
+ "description": ("Germany is a country in Central Europe."),
+ },
+ {
+ "iso_two_letter": "CZ",
+ "description": (
+ "The Czech Republic is a landlocked country in Central Europe."
+ ),
+ },
+ {"iso_two_letter": "AU", "description": ""},
+ {
+ "iso_two_letter": "JP",
+ "description": ("Japan is an island country in East Asia."),
+ },
+ ],
+ )
+
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_unique_two_fields_unique_fields_both(self):
- self._test_update_conflicts_unique_two_fields(['iso_two_letter', 'name'])
+ self._test_update_conflicts_unique_two_fields(["iso_two_letter", "name"])
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_unique_two_fields_unique_fields_one(self):
with self.assertRaises((OperationalError, ProgrammingError)):
- self._test_update_conflicts_unique_two_fields(['iso_two_letter'])
+ self._test_update_conflicts_unique_two_fields(["iso_two_letter"])
- @skipUnlessDBFeature('supports_update_conflicts')
- @skipIfDBFeature('supports_update_conflicts_with_target')
+ @skipUnlessDBFeature("supports_update_conflicts")
+ @skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_unique_two_fields_unique_no_unique_fields(self):
self._test_update_conflicts_unique_two_fields([])
def _test_update_conflicts(self, unique_fields):
- UpsertConflict.objects.bulk_create([
- UpsertConflict(number=1, rank=1, name='John'),
- UpsertConflict(number=2, rank=2, name='Mary'),
- UpsertConflict(number=3, rank=3, name='Hannah'),
- ])
+ UpsertConflict.objects.bulk_create(
+ [
+ UpsertConflict(number=1, rank=1, name="John"),
+ UpsertConflict(number=2, rank=2, name="Mary"),
+ UpsertConflict(number=3, rank=3, name="Hannah"),
+ ]
+ )
self.assertEqual(UpsertConflict.objects.count(), 3)
conflicting_objects = [
- UpsertConflict(number=1, rank=4, name='Steve'),
- UpsertConflict(number=2, rank=2, name='Olivia'),
- UpsertConflict(number=3, rank=1, name='Hannah'),
+ UpsertConflict(number=1, rank=4, name="Steve"),
+ UpsertConflict(number=2, rank=2, name="Olivia"),
+ UpsertConflict(number=3, rank=1, name="Hannah"),
]
UpsertConflict.objects.bulk_create(
conflicting_objects,
update_conflicts=True,
- update_fields=['name', 'rank'],
+ update_fields=["name", "rank"],
unique_fields=unique_fields,
)
self.assertEqual(UpsertConflict.objects.count(), 3)
- self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [
- {'number': 1, 'rank': 4, 'name': 'Steve'},
- {'number': 2, 'rank': 2, 'name': 'Olivia'},
- {'number': 3, 'rank': 1, 'name': 'Hannah'},
- ])
+ self.assertCountEqual(
+ UpsertConflict.objects.values("number", "rank", "name"),
+ [
+ {"number": 1, "rank": 4, "name": "Steve"},
+ {"number": 2, "rank": 2, "name": "Olivia"},
+ {"number": 3, "rank": 1, "name": "Hannah"},
+ ],
+ )
UpsertConflict.objects.bulk_create(
- conflicting_objects + [UpsertConflict(number=4, rank=4, name='Mark')],
+ conflicting_objects + [UpsertConflict(number=4, rank=4, name="Mark")],
update_conflicts=True,
- update_fields=['name', 'rank'],
+ update_fields=["name", "rank"],
unique_fields=unique_fields,
)
self.assertEqual(UpsertConflict.objects.count(), 4)
- self.assertCountEqual(UpsertConflict.objects.values('number', 'rank', 'name'), [
- {'number': 1, 'rank': 4, 'name': 'Steve'},
- {'number': 2, 'rank': 2, 'name': 'Olivia'},
- {'number': 3, 'rank': 1, 'name': 'Hannah'},
- {'number': 4, 'rank': 4, 'name': 'Mark'},
- ])
-
- @skipUnlessDBFeature('supports_update_conflicts', 'supports_update_conflicts_with_target')
+ self.assertCountEqual(
+ UpsertConflict.objects.values("number", "rank", "name"),
+ [
+ {"number": 1, "rank": 4, "name": "Steve"},
+ {"number": 2, "rank": 2, "name": "Olivia"},
+ {"number": 3, "rank": 1, "name": "Hannah"},
+ {"number": 4, "rank": 4, "name": "Mark"},
+ ],
+ )
+
+ @skipUnlessDBFeature(
+ "supports_update_conflicts", "supports_update_conflicts_with_target"
+ )
def test_update_conflicts_unique_fields(self):
- self._test_update_conflicts(unique_fields=['number'])
+ self._test_update_conflicts(unique_fields=["number"])
- @skipUnlessDBFeature('supports_update_conflicts')
- @skipIfDBFeature('supports_update_conflicts_with_target')
+ @skipUnlessDBFeature("supports_update_conflicts")
+ @skipIfDBFeature("supports_update_conflicts_with_target")
def test_update_conflicts_no_unique_fields(self):
self._test_update_conflicts([])