diff options
author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/delete | |
parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
download | django-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/delete')
-rw-r--r-- | tests/delete/models.py | 94 | ||||
-rw-r--r-- | tests/delete/tests.py | 178 |
2 files changed, 170 insertions, 102 deletions
diff --git a/tests/delete/models.py b/tests/delete/models.py index 96ef65c766..4b627712bb 100644 --- a/tests/delete/models.py +++ b/tests/delete/models.py @@ -1,6 +1,4 @@ -from django.contrib.contenttypes.fields import ( - GenericForeignKey, GenericRelation, -) +from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType from django.db import models @@ -42,27 +40,50 @@ class A(models.Model): name = models.CharField(max_length=30) auto = models.ForeignKey(R, models.CASCADE, related_name="auto_set") - auto_nullable = models.ForeignKey(R, models.CASCADE, null=True, related_name='auto_nullable_set') - setvalue = models.ForeignKey(R, models.SET(get_default_r), related_name='setvalue') - setnull = models.ForeignKey(R, models.SET_NULL, null=True, related_name='setnull_set') - setdefault = models.ForeignKey(R, models.SET_DEFAULT, default=get_default_r, related_name='setdefault_set') + auto_nullable = models.ForeignKey( + R, models.CASCADE, null=True, related_name="auto_nullable_set" + ) + setvalue = models.ForeignKey(R, models.SET(get_default_r), related_name="setvalue") + setnull = models.ForeignKey( + R, models.SET_NULL, null=True, related_name="setnull_set" + ) + setdefault = models.ForeignKey( + R, models.SET_DEFAULT, default=get_default_r, related_name="setdefault_set" + ) setdefault_none = models.ForeignKey( - R, models.SET_DEFAULT, - default=None, null=True, related_name='setnull_nullable_set', + R, + models.SET_DEFAULT, + default=None, + null=True, + related_name="setnull_nullable_set", + ) + cascade = models.ForeignKey(R, models.CASCADE, related_name="cascade_set") + cascade_nullable = models.ForeignKey( + R, models.CASCADE, null=True, related_name="cascade_nullable_set" + ) + protect = models.ForeignKey( + R, models.PROTECT, null=True, related_name="protect_set" + ) + restrict = models.ForeignKey( + R, models.RESTRICT, null=True, related_name="restrict_set" + ) + donothing = models.ForeignKey( + R, models.DO_NOTHING, null=True, related_name="donothing_set" ) - cascade = models.ForeignKey(R, models.CASCADE, related_name='cascade_set') - cascade_nullable = models.ForeignKey(R, models.CASCADE, null=True, related_name='cascade_nullable_set') - protect = models.ForeignKey(R, models.PROTECT, null=True, related_name='protect_set') - restrict = models.ForeignKey(R, models.RESTRICT, null=True, related_name='restrict_set') - donothing = models.ForeignKey(R, models.DO_NOTHING, null=True, related_name='donothing_set') child = models.ForeignKey(RChild, models.CASCADE, related_name="child") - child_setnull = models.ForeignKey(RChild, models.SET_NULL, null=True, related_name="child_setnull") - cascade_p = models.ForeignKey(P, models.CASCADE, related_name='cascade_p_set', null=True) + child_setnull = models.ForeignKey( + RChild, models.SET_NULL, null=True, related_name="child_setnull" + ) + cascade_p = models.ForeignKey( + P, models.CASCADE, related_name="cascade_p_set", null=True + ) # A OneToOneField is just a ForeignKey unique=True, so we don't duplicate # all the tests; just one smoke test to ensure on_delete works for it as # well. - o2o_setnull = models.ForeignKey(R, models.SET_NULL, null=True, related_name="o2o_nullable_set") + o2o_setnull = models.ForeignKey( + R, models.SET_NULL, null=True, related_name="o2o_nullable_set" + ) class B(models.Model): @@ -71,9 +92,20 @@ class B(models.Model): def create_a(name): a = A(name=name) - for name in ('auto', 'auto_nullable', 'setvalue', 'setnull', 'setdefault', - 'setdefault_none', 'cascade', 'cascade_nullable', 'protect', - 'restrict', 'donothing', 'o2o_setnull'): + for name in ( + "auto", + "auto_nullable", + "setvalue", + "setnull", + "setdefault", + "setdefault_none", + "cascade", + "cascade_nullable", + "protect", + "restrict", + "donothing", + "o2o_setnull", + ): r = R.objects.create() setattr(a, name, r) a.child = RChild.objects.create() @@ -85,7 +117,9 @@ def create_a(name): class M(models.Model): m2m = models.ManyToManyField(R, related_name="m_set") m2m_through = models.ManyToManyField(R, through="MR", related_name="m_through_set") - m2m_through_null = models.ManyToManyField(R, through="MRNull", related_name="m_through_null_set") + m2m_through_null = models.ManyToManyField( + R, through="MRNull", related_name="m_through_null_set" + ) class MR(models.Model): @@ -141,7 +175,7 @@ class Base(models.Model): class RelToBase(models.Model): - base = models.ForeignKey(Base, models.DO_NOTHING, related_name='rels') + base = models.ForeignKey(Base, models.DO_NOTHING, related_name="rels") class Origin(models.Model): @@ -157,13 +191,13 @@ class Referrer(models.Model): class SecondReferrer(models.Model): referrer = models.ForeignKey(Referrer, models.CASCADE) other_referrer = models.ForeignKey( - Referrer, models.CASCADE, to_field='unique_field', related_name='+' + Referrer, models.CASCADE, to_field="unique_field", related_name="+" ) class DeleteTop(models.Model): - b1 = GenericRelation('GenericB1') - b2 = GenericRelation('GenericB2') + b1 = GenericRelation("GenericB1") + b2 = GenericRelation("GenericB2") class B1(models.Model): @@ -186,14 +220,14 @@ class DeleteBottom(models.Model): class GenericB1(models.Model): content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() - generic_delete_top = GenericForeignKey('content_type', 'object_id') + generic_delete_top = GenericForeignKey("content_type", "object_id") class GenericB2(models.Model): content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() - generic_delete_top = GenericForeignKey('content_type', 'object_id') - generic_delete_bottom = GenericRelation('GenericDeleteBottom') + generic_delete_top = GenericForeignKey("content_type", "object_id") + generic_delete_bottom = GenericRelation("GenericDeleteBottom") class GenericDeleteBottom(models.Model): @@ -204,4 +238,6 @@ class GenericDeleteBottom(models.Model): class GenericDeleteBottomParent(models.Model): - generic_delete_bottom = models.ForeignKey(GenericDeleteBottom, on_delete=models.CASCADE) + generic_delete_bottom = models.ForeignKey( + GenericDeleteBottom, on_delete=models.CASCADE + ) diff --git a/tests/delete/tests.py b/tests/delete/tests.py index e5dde8047a..597589b836 100644 --- a/tests/delete/tests.py +++ b/tests/delete/tests.py @@ -7,10 +7,38 @@ from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature from .models import ( - B1, B2, B3, MR, A, Avatar, B, Base, Child, DeleteBottom, DeleteTop, - GenericB1, GenericB2, GenericDeleteBottom, HiddenUser, HiddenUserProfile, - M, M2MFrom, M2MTo, MRNull, Origin, P, Parent, R, RChild, RChildChild, - Referrer, S, T, User, create_a, get_default_r, + B1, + B2, + B3, + MR, + A, + Avatar, + B, + Base, + Child, + DeleteBottom, + DeleteTop, + GenericB1, + GenericB2, + GenericDeleteBottom, + HiddenUser, + HiddenUserProfile, + M, + M2MFrom, + M2MTo, + MRNull, + Origin, + P, + Parent, + R, + RChild, + RChildChild, + Referrer, + S, + T, + User, + create_a, + get_default_r, ) @@ -19,58 +47,58 @@ class OnDeleteTests(TestCase): self.DEFAULT = get_default_r() def test_auto(self): - a = create_a('auto') + a = create_a("auto") a.auto.delete() - self.assertFalse(A.objects.filter(name='auto').exists()) + self.assertFalse(A.objects.filter(name="auto").exists()) def test_non_callable(self): - msg = 'on_delete must be callable.' + msg = "on_delete must be callable." with self.assertRaisesMessage(TypeError, msg): - models.ForeignKey('self', on_delete=None) + models.ForeignKey("self", on_delete=None) with self.assertRaisesMessage(TypeError, msg): - models.OneToOneField('self', on_delete=None) + models.OneToOneField("self", on_delete=None) def test_auto_nullable(self): - a = create_a('auto_nullable') + a = create_a("auto_nullable") a.auto_nullable.delete() - self.assertFalse(A.objects.filter(name='auto_nullable').exists()) + self.assertFalse(A.objects.filter(name="auto_nullable").exists()) def test_setvalue(self): - a = create_a('setvalue') + a = create_a("setvalue") a.setvalue.delete() a = A.objects.get(pk=a.pk) self.assertEqual(self.DEFAULT, a.setvalue.pk) def test_setnull(self): - a = create_a('setnull') + a = create_a("setnull") a.setnull.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.setnull) def test_setdefault(self): - a = create_a('setdefault') + a = create_a("setdefault") a.setdefault.delete() a = A.objects.get(pk=a.pk) self.assertEqual(self.DEFAULT, a.setdefault.pk) def test_setdefault_none(self): - a = create_a('setdefault_none') + a = create_a("setdefault_none") a.setdefault_none.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.setdefault_none) def test_cascade(self): - a = create_a('cascade') + a = create_a("cascade") a.cascade.delete() - self.assertFalse(A.objects.filter(name='cascade').exists()) + self.assertFalse(A.objects.filter(name="cascade").exists()) def test_cascade_nullable(self): - a = create_a('cascade_nullable') + a = create_a("cascade_nullable") a.cascade_nullable.delete() - self.assertFalse(A.objects.filter(name='cascade_nullable').exists()) + self.assertFalse(A.objects.filter(name="cascade_nullable").exists()) def test_protect(self): - a = create_a('protect') + a = create_a("protect") msg = ( "Cannot delete some instances of model 'R' because they are " "referenced through protected foreign keys: 'A.protect'." @@ -80,7 +108,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.protected_objects, {a}) def test_protect_multiple(self): - a = create_a('protect') + a = create_a("protect") b = B.objects.create(protect=a.protect) msg = ( "Cannot delete some instances of model 'R' because they are " @@ -92,7 +120,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.protected_objects, {a, b}) def test_protect_path(self): - a = create_a('protect') + a = create_a("protect") a.protect.p = P.objects.create() a.protect.save() msg = ( @@ -109,10 +137,11 @@ class OnDeleteTests(TestCase): replacement_r = R.objects.create() def check_do_nothing(sender, **kwargs): - obj = kwargs['instance'] + obj = kwargs["instance"] obj.donothing_set.update(donothing=replacement_r) + models.signals.pre_delete.connect(check_do_nothing) - a = create_a('do_nothing') + a = create_a("do_nothing") a.donothing.delete() a = A.objects.get(pk=a.pk) self.assertEqual(replacement_r, a.donothing) @@ -140,19 +169,19 @@ class OnDeleteTests(TestCase): self.assertFalse(RChild.objects.filter(pk=child.pk).exists()) def test_cascade_from_child(self): - a = create_a('child') + a = create_a("child") a.child.delete() - self.assertFalse(A.objects.filter(name='child').exists()) + self.assertFalse(A.objects.filter(name="child").exists()) self.assertFalse(R.objects.filter(pk=a.child_id).exists()) def test_cascade_from_parent(self): - a = create_a('child') + a = create_a("child") R.objects.get(pk=a.child_id).delete() - self.assertFalse(A.objects.filter(name='child').exists()) + self.assertFalse(A.objects.filter(name="child").exists()) self.assertFalse(RChild.objects.filter(pk=a.child_id).exists()) def test_setnull_from_child(self): - a = create_a('child_setnull') + a = create_a("child_setnull") a.child_setnull.delete() self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists()) @@ -160,7 +189,7 @@ class OnDeleteTests(TestCase): self.assertIsNone(a.child_setnull) def test_setnull_from_parent(self): - a = create_a('child_setnull') + a = create_a("child_setnull") R.objects.get(pk=a.child_setnull_id).delete() self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists()) @@ -168,13 +197,13 @@ class OnDeleteTests(TestCase): self.assertIsNone(a.child_setnull) def test_o2o_setnull(self): - a = create_a('o2o_setnull') + a = create_a("o2o_setnull") a.o2o_setnull.delete() a = A.objects.get(pk=a.pk) self.assertIsNone(a.o2o_setnull) def test_restrict(self): - a = create_a('restrict') + a = create_a("restrict") msg = ( "Cannot delete some instances of model 'R' because they are " "referenced through restricted foreign keys: 'A.restrict'." @@ -184,7 +213,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.restricted_objects, {a}) def test_restrict_multiple(self): - a = create_a('restrict') + a = create_a("restrict") b3 = B3.objects.create(restrict=a.restrict) msg = ( "Cannot delete some instances of model 'R' because they are " @@ -196,7 +225,7 @@ class OnDeleteTests(TestCase): self.assertEqual(cm.exception.restricted_objects, {a, b3}) def test_restrict_path_cascade_indirect(self): - a = create_a('restrict') + a = create_a("restrict") a.restrict.p = P.objects.create() a.restrict.save() msg = ( @@ -210,17 +239,17 @@ class OnDeleteTests(TestCase): a.cascade.p = a.restrict.p a.cascade.save() a.restrict.p.delete() - self.assertFalse(A.objects.filter(name='restrict').exists()) + self.assertFalse(A.objects.filter(name="restrict").exists()) self.assertFalse(R.objects.filter(pk=a.restrict_id).exists()) def test_restrict_path_cascade_direct(self): - a = create_a('restrict') + a = create_a("restrict") a.restrict.p = P.objects.create() a.restrict.save() a.cascade_p = a.restrict.p a.save() a.restrict.p.delete() - self.assertFalse(A.objects.filter(name='restrict').exists()) + self.assertFalse(A.objects.filter(name="restrict").exists()) self.assertFalse(R.objects.filter(pk=a.restrict_id).exists()) def test_restrict_path_cascade_indirect_diamond(self): @@ -302,7 +331,7 @@ class DeletionTests(TestCase): r = R.objects.create() m.m2m.add(r) r.delete() - through = M._meta.get_field('m2m').remote_field.through + through = M._meta.get_field("m2m").remote_field.through self.assertFalse(through.objects.exists()) r = R.objects.create() @@ -333,16 +362,16 @@ class DeletionTests(TestCase): related_setnull_sets = [] def pre_delete(sender, **kwargs): - obj = kwargs['instance'] + obj = kwargs["instance"] deleted.append(obj) if isinstance(obj, R): related_setnull_sets.append([a.pk for a in obj.setnull_set.all()]) models.signals.pre_delete.connect(pre_delete) - a = create_a('update_setnull') + a = create_a("update_setnull") a.setnull.delete() - a = create_a('update_cascade') + a = create_a("update_cascade") a.cascade.delete() for obj in deleted: @@ -359,10 +388,10 @@ class DeletionTests(TestCase): post_delete_order = [] def log_post_delete(sender, **kwargs): - pre_delete_order.append((sender, kwargs['instance'].pk)) + pre_delete_order.append((sender, kwargs["instance"].pk)) def log_pre_delete(sender, **kwargs): - post_delete_order.append((sender, kwargs['instance'].pk)) + post_delete_order.append((sender, kwargs["instance"].pk)) models.signals.post_delete.connect(log_post_delete) models.signals.pre_delete.connect(log_pre_delete) @@ -407,9 +436,7 @@ class DeletionTests(TestCase): @skipUnlessDBFeature("can_defer_constraint_checks") def test_can_defer_constraint_checks(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) a = Avatar.objects.get(pk=u.avatar_id) # 1 query to find the users for the avatar. # 1 query to delete the user @@ -421,7 +448,8 @@ class DeletionTests(TestCase): calls = [] def noop(*args, **kwargs): - calls.append('') + calls.append("") + models.signals.post_delete.connect(noop, sender=User) self.assertNumQueries(3, a.delete) @@ -432,14 +460,13 @@ class DeletionTests(TestCase): @skipIfDBFeature("can_defer_constraint_checks") def test_cannot_defer_constraint_checks(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) # Attach a signal to make sure we will not do fast_deletes. calls = [] def noop(*args, **kwargs): - calls.append('') + calls.append("") + models.signals.post_delete.connect(noop, sender=User) a = Avatar.objects.get(pk=u.avatar_id) @@ -469,7 +496,7 @@ class DeletionTests(TestCase): objs = [Avatar() for i in range(0, TEST_SIZE)] Avatar.objects.bulk_create(objs) # Calculate the number of queries needed. - batch_size = connection.ops.bulk_batch_size(['pk'], objs) + batch_size = connection.ops.bulk_batch_size(["pk"], objs) # The related fetches are done in batches. batches = ceil(len(objs) / batch_size) # One query for Avatar.objects.all() and then one related fast delete for @@ -486,7 +513,7 @@ class DeletionTests(TestCase): for i in range(TEST_SIZE): T.objects.create(s=s) - batch_size = max(connection.ops.bulk_batch_size(['pk'], range(TEST_SIZE)), 1) + batch_size = max(connection.ops.bulk_batch_size(["pk"], range(TEST_SIZE)), 1) # TEST_SIZE / batch_size (select related `T` instances) # + 1 (select related `U` instances) @@ -530,7 +557,9 @@ class DeletionTests(TestCase): QuerySet.delete() should return the number of deleted rows and a dictionary with the number of deletions for each object type. """ - Avatar.objects.bulk_create([Avatar(desc='a'), Avatar(desc='b'), Avatar(desc='c')]) + Avatar.objects.bulk_create( + [Avatar(desc="a"), Avatar(desc="b"), Avatar(desc="c")] + ) avatars_count = Avatar.objects.count() deleted, rows_count = Avatar.objects.all().delete() self.assertEqual(deleted, avatars_count) @@ -600,18 +629,21 @@ class DeletionTests(TestCase): expected_sql = str( Referrer.objects.only( # Both fields are referenced by SecondReferrer. - 'id', 'unique_field', - ).filter(origin__in=[origin]).query + "id", + "unique_field", + ) + .filter(origin__in=[origin]) + .query ) with self.assertNumQueries(2) as ctx: origin.delete() - self.assertEqual(ctx.captured_queries[0]['sql'], expected_sql) + self.assertEqual(ctx.captured_queries[0]["sql"], expected_sql) def receiver(instance, **kwargs): pass # All fields are selected if deletion signals are connected. - for signal_name in ('pre_delete', 'post_delete'): + for signal_name in ("pre_delete", "post_delete"): with self.subTest(signal=signal_name): origin = Origin.objects.create() signal = getattr(models.signals, signal_name) @@ -619,8 +651,8 @@ class DeletionTests(TestCase): with self.assertNumQueries(2) as ctx: origin.delete() self.assertIn( - connection.ops.quote_name('large_field'), - ctx.captured_queries[0]['sql'], + connection.ops.quote_name("large_field"), + ctx.captured_queries[0]["sql"], ) signal.disconnect(receiver, sender=Referrer) @@ -629,14 +661,12 @@ class FastDeleteTests(TestCase): def test_fast_delete_all(self): with self.assertNumQueries(1) as ctx: User.objects.all().delete() - sql = ctx.captured_queries[0]['sql'] + sql = ctx.captured_queries[0]["sql"] # No subqueries is used when performing a full delete. - self.assertNotIn('SELECT', sql) + self.assertNotIn("SELECT", sql) def test_fast_delete_fk(self): - u = User.objects.create( - avatar=Avatar.objects.create() - ) + u = User.objects.create(avatar=Avatar.objects.create()) a = Avatar.objects.get(pk=u.avatar_id) # 1 query to fast-delete the user # 1 query to delete the avatar @@ -668,16 +698,16 @@ class FastDeleteTests(TestCase): def test_fast_delete_instance_set_pk_none(self): u = User.objects.create() # User can be fast-deleted. - collector = Collector(using='default') + collector = Collector(using="default") self.assertTrue(collector.can_fast_delete(u)) u.delete() self.assertIsNone(u.pk) def test_fast_delete_joined_qs(self): - a = Avatar.objects.create(desc='a') + a = Avatar.objects.create(desc="a") User.objects.create(avatar=a) u2 = User.objects.create() - self.assertNumQueries(1, User.objects.filter(avatar__desc='a').delete) + self.assertNumQueries(1, User.objects.filter(avatar__desc="a").delete) self.assertEqual(User.objects.count(), 1) self.assertTrue(User.objects.filter(pk=u2.pk).exists()) @@ -704,7 +734,7 @@ class FastDeleteTests(TestCase): # No problems here - we aren't going to cascade, so we will fast # delete the objects in a single query. self.assertNumQueries(1, User.objects.all().delete) - a = Avatar.objects.create(desc='a') + a = Avatar.objects.create(desc="a") User.objects.bulk_create(User(avatar=a) for i in range(0, 2000)) # We don't hit parameter amount limits for a, so just one query for # that + fast delete of the related objs. @@ -718,7 +748,7 @@ class FastDeleteTests(TestCase): """ with self.assertNumQueries(1): self.assertEqual( - User.objects.filter(avatar__desc='missing').delete(), + User.objects.filter(avatar__desc="missing").delete(), (0, {}), ) @@ -737,8 +767,10 @@ class FastDeleteTests(TestCase): with self.assertNumQueries(1): self.assertEqual( Base.objects.annotate( - rels_count=models.Count('rels'), - ).filter(rels_count=0).delete(), - (1, {'delete.Base': 1}), + rels_count=models.Count("rels"), + ) + .filter(rels_count=0) + .delete(), + (1, {"delete.Base": 1}), ) self.assertIs(Base.objects.exists(), False) |