diff options
author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/queries | |
parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
download | django-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/queries')
-rw-r--r-- | tests/queries/models.py | 152 | ||||
-rw-r--r-- | tests/queries/test_bulk_update.py | 210 | ||||
-rw-r--r-- | tests/queries/test_contains.py | 10 | ||||
-rw-r--r-- | tests/queries/test_db_returning.py | 24 | ||||
-rw-r--r-- | tests/queries/test_explain.py | 106 | ||||
-rw-r--r-- | tests/queries/test_iterator.py | 18 | ||||
-rw-r--r-- | tests/queries/test_q.py | 77 | ||||
-rw-r--r-- | tests/queries/test_qs_combinators.py | 386 | ||||
-rw-r--r-- | tests/queries/test_query.py | 46 | ||||
-rw-r--r-- | tests/queries/test_sqlcompiler.py | 2 | ||||
-rw-r--r-- | tests/queries/tests.py | 2633 |
11 files changed, 2110 insertions, 1554 deletions
diff --git a/tests/queries/models.py b/tests/queries/models.py index 8e7ee1625c..155b74b0c9 100644 --- a/tests/queries/models.py +++ b/tests/queries/models.py @@ -24,15 +24,18 @@ class NamedCategory(DumbCategory): class Tag(models.Model): name = models.CharField(max_length=10) parent = models.ForeignKey( - 'self', + "self", models.SET_NULL, - blank=True, null=True, - related_name='children', + blank=True, + null=True, + related_name="children", + ) + category = models.ForeignKey( + NamedCategory, models.SET_NULL, null=True, default=None ) - category = models.ForeignKey(NamedCategory, models.SET_NULL, null=True, default=None) class Meta: - ordering = ['name'] + ordering = ["name"] def __str__(self): return self.name @@ -45,7 +48,7 @@ class Note(models.Model): negate = models.BooleanField(default=True) class Meta: - ordering = ['note'] + ordering = ["note"] def __str__(self): return self.note @@ -72,7 +75,7 @@ class ExtraInfo(models.Model): filterable = models.BooleanField(default=True) class Meta: - ordering = ['info'] + ordering = ["info"] def __str__(self): return self.info @@ -84,7 +87,7 @@ class Author(models.Model): extra = models.ForeignKey(ExtraInfo, models.CASCADE) class Meta: - ordering = ['name'] + ordering = ["name"] def __str__(self): return self.name @@ -99,7 +102,7 @@ class Item(models.Model): note = models.ForeignKey(Note, models.CASCADE) class Meta: - ordering = ['-note', 'name'] + ordering = ["-note", "name"] def __str__(self): return self.name @@ -107,7 +110,7 @@ class Item(models.Model): class Report(models.Model): name = models.CharField(max_length=10) - creator = models.ForeignKey(Author, models.SET_NULL, to_field='num', null=True) + creator = models.ForeignKey(Author, models.SET_NULL, to_field="num", null=True) def __str__(self): return self.name @@ -123,10 +126,10 @@ class Ranking(models.Model): class Meta: # A complex ordering specification. Should stress the system a bit. - ordering = ('author__extra__note', 'author__name', 'rank') + ordering = ("author__extra__note", "author__name", "rank") def __str__(self): - return '%d: %s' % (self.rank, self.author.name) + return "%d: %s" % (self.rank, self.author.name) class Cover(models.Model): @@ -134,7 +137,7 @@ class Cover(models.Model): item = models.ForeignKey(Item, models.CASCADE) class Meta: - ordering = ['item'] + ordering = ["item"] def __str__(self): return self.title @@ -148,51 +151,54 @@ class Number(models.Model): def __str__(self): return str(self.num) + # Symmetrical m2m field with a normal field using the reverse accessor name # ("valid"). class Valid(models.Model): valid = models.CharField(max_length=10) - parent = models.ManyToManyField('self') + parent = models.ManyToManyField("self") class Meta: - ordering = ['valid'] + ordering = ["valid"] + # Some funky cross-linked models for testing a couple of infinite recursion # cases. class X(models.Model): - y = models.ForeignKey('Y', models.CASCADE) + y = models.ForeignKey("Y", models.CASCADE) class Y(models.Model): - x1 = models.ForeignKey(X, models.CASCADE, related_name='y1') + x1 = models.ForeignKey(X, models.CASCADE, related_name="y1") + # Some models with a cycle in the default ordering. This would be bad if we # didn't catch the infinite loop. class LoopX(models.Model): - y = models.ForeignKey('LoopY', models.CASCADE) + y = models.ForeignKey("LoopY", models.CASCADE) class Meta: - ordering = ['y'] + ordering = ["y"] class LoopY(models.Model): x = models.ForeignKey(LoopX, models.CASCADE) class Meta: - ordering = ['x'] + ordering = ["x"] class LoopZ(models.Model): - z = models.ForeignKey('self', models.CASCADE) + z = models.ForeignKey("self", models.CASCADE) class Meta: - ordering = ['z'] + ordering = ["z"] # A model and custom default manager combination. @@ -201,7 +207,7 @@ class LoopZ(models.Model): class CustomManager(models.Manager): def get_queryset(self): qs = super().get_queryset() - return qs.filter(public=True, tag__name='t1') + return qs.filter(public=True, tag__name="t1") class ManagedModel(models.Model): @@ -215,6 +221,7 @@ class ManagedModel(models.Model): def __str__(self): return self.data + # An inter-related setup with multiple paths from Child to Detail. @@ -238,6 +245,7 @@ class Child(models.Model): person = models.OneToOneField(Member, models.CASCADE, primary_key=True) parent = models.ForeignKey(Member, models.CASCADE, related_name="children") + # Custom primary keys interfered with ordering in the past. @@ -246,7 +254,7 @@ class CustomPk(models.Model): extra = models.CharField(max_length=10) class Meta: - ordering = ['name', 'extra'] + ordering = ["name", "extra"] class Related(models.Model): @@ -258,6 +266,7 @@ class CustomPkTag(models.Model): custom_pk = models.ManyToManyField(CustomPk) tag = models.CharField(max_length=20) + # An inter-related setup with a model subclass that has a nullable # path to another model, and a return path from that model. @@ -277,6 +286,7 @@ class TvChef(Celebrity): class Fan(models.Model): fan_of = models.ForeignKey(Celebrity, models.CASCADE) + # Multiple foreign keys @@ -303,6 +313,7 @@ class ReservedName(models.Model): def __str__(self): return self.name + # A simpler shared-foreign-key setup that can expose some problems. @@ -320,6 +331,7 @@ class PointerA(models.Model): class PointerB(models.Model): connection = models.ForeignKey(SharedConnection, models.CASCADE) + # Multi-layer ordering @@ -327,7 +339,7 @@ class SingleObject(models.Model): name = models.CharField(max_length=10) class Meta: - ordering = ['name'] + ordering = ["name"] def __str__(self): return self.name @@ -338,7 +350,7 @@ class RelatedObject(models.Model): f = models.IntegerField(null=True) class Meta: - ordering = ['single'] + ordering = ["single"] class Plaything(models.Model): @@ -346,7 +358,7 @@ class Plaything(models.Model): others = models.ForeignKey(RelatedObject, models.SET_NULL, null=True) class Meta: - ordering = ['others'] + ordering = ["others"] def __str__(self): return self.name @@ -382,6 +394,7 @@ class Node(models.Model): def __str__(self): return str(self.num) + # Bug #12252 @@ -393,7 +406,7 @@ class ObjectA(models.Model): def __iter__(self): # Ticket #23721 - assert False, 'type checking should happen without calling model __iter__' + assert False, "type checking should happen without calling model __iter__" class ProxyObjectA(ObjectA): @@ -423,7 +436,9 @@ class ObjectC(models.Model): name = models.CharField(max_length=50) objecta = models.ForeignKey(ObjectA, models.SET_NULL, null=True) objectb = models.ForeignKey(ObjectB, models.SET_NULL, null=True) - childobjecta = models.ForeignKey(ChildObjectA, models.SET_NULL, null=True, related_name='ca_pk') + childobjecta = models.ForeignKey( + ChildObjectA, models.SET_NULL, null=True, related_name="ca_pk" + ) def __str__(self): return self.name @@ -455,7 +470,9 @@ class MixedCaseFieldCategoryItem(models.Model): class MixedCaseDbColumnCategoryItem(models.Model): - category = models.ForeignKey(SimpleCategory, models.CASCADE, db_column='CaTeGoRy_Id') + category = models.ForeignKey( + SimpleCategory, models.CASCADE, db_column="CaTeGoRy_Id" + ) class OneToOneCategory(models.Model): @@ -467,21 +484,27 @@ class OneToOneCategory(models.Model): class CategoryRelationship(models.Model): - first = models.ForeignKey(SimpleCategory, models.CASCADE, related_name='first_rel') - second = models.ForeignKey(SimpleCategory, models.CASCADE, related_name='second_rel') + first = models.ForeignKey(SimpleCategory, models.CASCADE, related_name="first_rel") + second = models.ForeignKey( + SimpleCategory, models.CASCADE, related_name="second_rel" + ) class CommonMixedCaseForeignKeys(models.Model): category = models.ForeignKey(CategoryItem, models.CASCADE) - mixed_case_field_category = models.ForeignKey(MixedCaseFieldCategoryItem, models.CASCADE) - mixed_case_db_column_category = models.ForeignKey(MixedCaseDbColumnCategoryItem, models.CASCADE) + mixed_case_field_category = models.ForeignKey( + MixedCaseFieldCategoryItem, models.CASCADE + ) + mixed_case_db_column_category = models.ForeignKey( + MixedCaseDbColumnCategoryItem, models.CASCADE + ) class NullableName(models.Model): name = models.CharField(max_length=20, null=True) class Meta: - ordering = ['id'] + ordering = ["id"] class ModelD(models.Model): @@ -511,18 +534,22 @@ class Job(models.Model): class JobResponsibilities(models.Model): - job = models.ForeignKey(Job, models.CASCADE, to_field='name') - responsibility = models.ForeignKey('Responsibility', models.CASCADE, to_field='description') + job = models.ForeignKey(Job, models.CASCADE, to_field="name") + responsibility = models.ForeignKey( + "Responsibility", models.CASCADE, to_field="description" + ) class Responsibility(models.Model): description = models.CharField(max_length=20, unique=True) - jobs = models.ManyToManyField(Job, through=JobResponsibilities, - related_name='responsibilities') + jobs = models.ManyToManyField( + Job, through=JobResponsibilities, related_name="responsibilities" + ) def __str__(self): return self.description + # Models for disjunction join promotion low level testing. @@ -565,17 +592,17 @@ class Channel(models.Model): class Book(models.Model): title = models.TextField() - chapter = models.ForeignKey('Chapter', models.CASCADE) + chapter = models.ForeignKey("Chapter", models.CASCADE) class Chapter(models.Model): title = models.TextField() - paragraph = models.ForeignKey('Paragraph', models.CASCADE) + paragraph = models.ForeignKey("Paragraph", models.CASCADE) class Paragraph(models.Model): text = models.TextField() - page = models.ManyToManyField('Page') + page = models.ManyToManyField("Page") class Page(models.Model): @@ -583,30 +610,33 @@ class Page(models.Model): class MyObject(models.Model): - parent = models.ForeignKey('self', models.SET_NULL, null=True, blank=True, related_name='children') + parent = models.ForeignKey( + "self", models.SET_NULL, null=True, blank=True, related_name="children" + ) data = models.CharField(max_length=100) created_at = models.DateTimeField(auto_now_add=True) + # Models for #17600 regressions class Order(models.Model): id = models.IntegerField(primary_key=True) - name = models.CharField(max_length=12, null=True, default='') + name = models.CharField(max_length=12, null=True, default="") class Meta: - ordering = ('pk',) + ordering = ("pk",) def __str__(self): return str(self.pk) class OrderItem(models.Model): - order = models.ForeignKey(Order, models.CASCADE, related_name='items') + order = models.ForeignKey(Order, models.CASCADE, related_name="items") status = models.IntegerField() class Meta: - ordering = ('pk',) + ordering = ("pk",) def __str__(self): return str(self.pk) @@ -618,8 +648,8 @@ class BaseUser(models.Model): class Task(models.Model): title = models.CharField(max_length=10) - owner = models.ForeignKey(BaseUser, models.CASCADE, related_name='owner') - creator = models.ForeignKey(BaseUser, models.CASCADE, related_name='creator') + owner = models.ForeignKey(BaseUser, models.CASCADE, related_name="owner") + creator = models.ForeignKey(BaseUser, models.CASCADE, related_name="creator") note = models.ForeignKey(Note, on_delete=models.CASCADE, null=True, blank=True) def __str__(self): @@ -634,7 +664,7 @@ class Staff(models.Model): class StaffUser(BaseUser): - staff = models.OneToOneField(Staff, models.CASCADE, related_name='user') + staff = models.OneToOneField(Staff, models.CASCADE, related_name="user") def __str__(self): return str(self.staff) @@ -657,7 +687,9 @@ class Person(models.Model): class Company(models.Model): name = models.CharField(max_length=128) - employees = models.ManyToManyField(Person, related_name='employers', through='Employment') + employees = models.ManyToManyField( + Person, related_name="employers", through="Employment" + ) def __str__(self): return self.name @@ -681,12 +713,12 @@ class Classroom(models.Model): name = models.CharField(max_length=20) has_blackboard = models.BooleanField(null=True) school = models.ForeignKey(School, models.CASCADE) - students = models.ManyToManyField(Student, related_name='classroom') + students = models.ManyToManyField(Student, related_name="classroom") class Teacher(models.Model): schools = models.ManyToManyField(School) - friends = models.ManyToManyField('self') + friends = models.ManyToManyField("self") class Ticket23605AParent(models.Model): @@ -713,18 +745,20 @@ class Individual(models.Model): alive = models.BooleanField() class Meta: - db_table = 'Individual' + db_table = "Individual" class RelatedIndividual(models.Model): - related = models.ForeignKey(Individual, models.CASCADE, related_name='related_individual') + related = models.ForeignKey( + Individual, models.CASCADE, related_name="related_individual" + ) class Meta: - db_table = 'RelatedIndividual' + db_table = "RelatedIndividual" class CustomDbColumn(models.Model): - custom_column = models.IntegerField(db_column='custom_name', null=True) + custom_column = models.IntegerField(db_column="custom_name", null=True) ip_address = models.GenericIPAddressField(null=True) @@ -732,7 +766,7 @@ class CreatedField(models.DateTimeField): db_returning = True def __init__(self, *args, **kwargs): - kwargs.setdefault('default', Now) + kwargs.setdefault("default", Now) super().__init__(*args, **kwargs) @@ -748,4 +782,4 @@ class JSONFieldNullable(models.Model): json_field = models.JSONField(blank=True, null=True) class Meta: - required_db_features = {'supports_json_field'} + required_db_features = {"supports_json_field"} diff --git a/tests/queries/test_bulk_update.py b/tests/queries/test_bulk_update.py index b63046f9d2..389d6c1c41 100644 --- a/tests/queries/test_bulk_update.py +++ b/tests/queries/test_bulk_update.py @@ -6,67 +6,74 @@ from django.db.models.functions import Lower from django.test import TestCase, skipUnlessDBFeature from .models import ( - Article, CustomDbColumn, CustomPk, Detail, Individual, JSONFieldNullable, - Member, Note, Number, Order, Paragraph, RelatedObject, SingleObject, - SpecialCategory, Tag, Valid, + Article, + CustomDbColumn, + CustomPk, + Detail, + Individual, + JSONFieldNullable, + Member, + Note, + Number, + Order, + Paragraph, + RelatedObject, + SingleObject, + SpecialCategory, + Tag, + Valid, ) class BulkUpdateNoteTests(TestCase): @classmethod def setUpTestData(cls): - cls.notes = [ - Note.objects.create(note=str(i), misc=str(i)) - for i in range(10) - ] + cls.notes = [Note.objects.create(note=str(i), misc=str(i)) for i in range(10)] def create_tags(self): - self.tags = [ - Tag.objects.create(name=str(i)) - for i in range(10) - ] + self.tags = [Tag.objects.create(name=str(i)) for i in range(10)] def test_simple(self): for note in self.notes: - note.note = 'test-%s' % note.id + note.note = "test-%s" % note.id with self.assertNumQueries(1): - Note.objects.bulk_update(self.notes, ['note']) + Note.objects.bulk_update(self.notes, ["note"]) self.assertCountEqual( - Note.objects.values_list('note', flat=True), - [cat.note for cat in self.notes] + Note.objects.values_list("note", flat=True), + [cat.note for cat in self.notes], ) def test_multiple_fields(self): for note in self.notes: - note.note = 'test-%s' % note.id - note.misc = 'misc-%s' % note.id + note.note = "test-%s" % note.id + note.misc = "misc-%s" % note.id with self.assertNumQueries(1): - Note.objects.bulk_update(self.notes, ['note', 'misc']) + Note.objects.bulk_update(self.notes, ["note", "misc"]) self.assertCountEqual( - Note.objects.values_list('note', flat=True), - [cat.note for cat in self.notes] + Note.objects.values_list("note", flat=True), + [cat.note for cat in self.notes], ) self.assertCountEqual( - Note.objects.values_list('misc', flat=True), - [cat.misc for cat in self.notes] + Note.objects.values_list("misc", flat=True), + [cat.misc for cat in self.notes], ) def test_batch_size(self): with self.assertNumQueries(len(self.notes)): - Note.objects.bulk_update(self.notes, fields=['note'], batch_size=1) + Note.objects.bulk_update(self.notes, fields=["note"], batch_size=1) def test_unsaved_models(self): - objs = self.notes + [Note(note='test', misc='test')] - msg = 'All bulk_update() objects must have a primary key set.' + objs = self.notes + [Note(note="test", misc="test")] + msg = "All bulk_update() objects must have a primary key set." with self.assertRaisesMessage(ValueError, msg): - Note.objects.bulk_update(objs, fields=['note']) + Note.objects.bulk_update(objs, fields=["note"]) def test_foreign_keys_do_not_lookup(self): self.create_tags() for note, tag in zip(self.notes, self.tags): note.tag = tag with self.assertNumQueries(1): - Note.objects.bulk_update(self.notes, ['tag']) + Note.objects.bulk_update(self.notes, ["tag"]) self.assertSequenceEqual(Note.objects.filter(tag__isnull=False), self.notes) def test_set_field_to_null(self): @@ -74,7 +81,7 @@ class BulkUpdateNoteTests(TestCase): Note.objects.update(tag=self.tags[0]) for note in self.notes: note.tag = None - Note.objects.bulk_update(self.notes, ['tag']) + Note.objects.bulk_update(self.notes, ["tag"]) self.assertCountEqual(Note.objects.filter(tag__isnull=True), self.notes) def test_set_mixed_fields_to_null(self): @@ -85,106 +92,106 @@ class BulkUpdateNoteTests(TestCase): note.tag = None for note in bottom: note.tag = self.tags[0] - Note.objects.bulk_update(self.notes, ['tag']) + Note.objects.bulk_update(self.notes, ["tag"]) self.assertCountEqual(Note.objects.filter(tag__isnull=True), top) self.assertCountEqual(Note.objects.filter(tag__isnull=False), bottom) def test_functions(self): - Note.objects.update(note='TEST') + Note.objects.update(note="TEST") for note in self.notes: - note.note = Lower('note') - Note.objects.bulk_update(self.notes, ['note']) - self.assertEqual(set(Note.objects.values_list('note', flat=True)), {'test'}) + note.note = Lower("note") + Note.objects.bulk_update(self.notes, ["note"]) + self.assertEqual(set(Note.objects.values_list("note", flat=True)), {"test"}) # Tests that use self.notes go here, otherwise put them in another class. class BulkUpdateTests(TestCase): def test_no_fields(self): - msg = 'Field names must be given to bulk_update().' + msg = "Field names must be given to bulk_update()." with self.assertRaisesMessage(ValueError, msg): Note.objects.bulk_update([], fields=[]) def test_invalid_batch_size(self): - msg = 'Batch size must be a positive integer.' + msg = "Batch size must be a positive integer." with self.assertRaisesMessage(ValueError, msg): - Note.objects.bulk_update([], fields=['note'], batch_size=-1) + Note.objects.bulk_update([], fields=["note"], batch_size=-1) def test_nonexistent_field(self): - with self.assertRaisesMessage(FieldDoesNotExist, "Note has no field named 'nonexistent'"): - Note.objects.bulk_update([], ['nonexistent']) + with self.assertRaisesMessage( + FieldDoesNotExist, "Note has no field named 'nonexistent'" + ): + Note.objects.bulk_update([], ["nonexistent"]) - pk_fields_error = 'bulk_update() cannot be used with primary key fields.' + pk_fields_error = "bulk_update() cannot be used with primary key fields." def test_update_primary_key(self): with self.assertRaisesMessage(ValueError, self.pk_fields_error): - Note.objects.bulk_update([], ['id']) + Note.objects.bulk_update([], ["id"]) def test_update_custom_primary_key(self): with self.assertRaisesMessage(ValueError, self.pk_fields_error): - CustomPk.objects.bulk_update([], ['name']) + CustomPk.objects.bulk_update([], ["name"]) def test_empty_objects(self): with self.assertNumQueries(0): - rows_updated = Note.objects.bulk_update([], ['note']) + rows_updated = Note.objects.bulk_update([], ["note"]) self.assertEqual(rows_updated, 0) def test_large_batch(self): - Note.objects.bulk_create([ - Note(note=str(i), misc=str(i)) - for i in range(0, 2000) - ]) + Note.objects.bulk_create( + [Note(note=str(i), misc=str(i)) for i in range(0, 2000)] + ) notes = list(Note.objects.all()) - rows_updated = Note.objects.bulk_update(notes, ['note']) + rows_updated = Note.objects.bulk_update(notes, ["note"]) self.assertEqual(rows_updated, 2000) def test_updated_rows_when_passing_duplicates(self): - note = Note.objects.create(note='test-note', misc='test') - rows_updated = Note.objects.bulk_update([note, note], ['note']) + note = Note.objects.create(note="test-note", misc="test") + rows_updated = Note.objects.bulk_update([note, note], ["note"]) self.assertEqual(rows_updated, 1) # Duplicates in different batches. - rows_updated = Note.objects.bulk_update([note, note], ['note'], batch_size=1) + rows_updated = Note.objects.bulk_update([note, note], ["note"], batch_size=1) self.assertEqual(rows_updated, 2) def test_only_concrete_fields_allowed(self): - obj = Valid.objects.create(valid='test') - detail = Detail.objects.create(data='test') - paragraph = Paragraph.objects.create(text='test') - Member.objects.create(name='test', details=detail) - msg = 'bulk_update() can only be used with concrete fields.' + obj = Valid.objects.create(valid="test") + detail = Detail.objects.create(data="test") + paragraph = Paragraph.objects.create(text="test") + Member.objects.create(name="test", details=detail) + msg = "bulk_update() can only be used with concrete fields." with self.assertRaisesMessage(ValueError, msg): - Detail.objects.bulk_update([detail], fields=['member']) + Detail.objects.bulk_update([detail], fields=["member"]) with self.assertRaisesMessage(ValueError, msg): - Paragraph.objects.bulk_update([paragraph], fields=['page']) + Paragraph.objects.bulk_update([paragraph], fields=["page"]) with self.assertRaisesMessage(ValueError, msg): - Valid.objects.bulk_update([obj], fields=['parent']) + Valid.objects.bulk_update([obj], fields=["parent"]) def test_custom_db_columns(self): model = CustomDbColumn.objects.create(custom_column=1) model.custom_column = 2 - CustomDbColumn.objects.bulk_update([model], fields=['custom_column']) + CustomDbColumn.objects.bulk_update([model], fields=["custom_column"]) model.refresh_from_db() self.assertEqual(model.custom_column, 2) def test_custom_pk(self): custom_pks = [ - CustomPk.objects.create(name='pk-%s' % i, extra='') - for i in range(10) + CustomPk.objects.create(name="pk-%s" % i, extra="") for i in range(10) ] for model in custom_pks: - model.extra = 'extra-%s' % model.pk - CustomPk.objects.bulk_update(custom_pks, ['extra']) + model.extra = "extra-%s" % model.pk + CustomPk.objects.bulk_update(custom_pks, ["extra"]) self.assertCountEqual( - CustomPk.objects.values_list('extra', flat=True), - [cat.extra for cat in custom_pks] + CustomPk.objects.values_list("extra", flat=True), + [cat.extra for cat in custom_pks], ) def test_falsey_pk_value(self): - order = Order.objects.create(pk=0, name='test') - order.name = 'updated' - Order.objects.bulk_update([order], ['name']) + order = Order.objects.create(pk=0, name="test") + order.name = "updated" + Order.objects.bulk_update([order], ["name"]) order.refresh_from_db() - self.assertEqual(order.name, 'updated') + self.assertEqual(order.name, "updated") def test_inherited_fields(self): special_categories = [ @@ -192,53 +199,56 @@ class BulkUpdateTests(TestCase): for i in range(10) ] for category in special_categories: - category.name = 'test-%s' % category.id - category.special_name = 'special-test-%s' % category.special_name - SpecialCategory.objects.bulk_update(special_categories, ['name', 'special_name']) + category.name = "test-%s" % category.id + category.special_name = "special-test-%s" % category.special_name + SpecialCategory.objects.bulk_update( + special_categories, ["name", "special_name"] + ) self.assertCountEqual( - SpecialCategory.objects.values_list('name', flat=True), - [cat.name for cat in special_categories] + SpecialCategory.objects.values_list("name", flat=True), + [cat.name for cat in special_categories], ) self.assertCountEqual( - SpecialCategory.objects.values_list('special_name', flat=True), - [cat.special_name for cat in special_categories] + SpecialCategory.objects.values_list("special_name", flat=True), + [cat.special_name for cat in special_categories], ) def test_field_references(self): numbers = [Number.objects.create(num=0) for _ in range(10)] for number in numbers: - number.num = F('num') + 1 - Number.objects.bulk_update(numbers, ['num']) + number.num = F("num") + 1 + Number.objects.bulk_update(numbers, ["num"]) self.assertCountEqual(Number.objects.filter(num=1), numbers) def test_f_expression(self): notes = [ - Note.objects.create(note='test_note', misc='test_misc') - for _ in range(10) + Note.objects.create(note="test_note", misc="test_misc") for _ in range(10) ] for note in notes: - note.misc = F('note') - Note.objects.bulk_update(notes, ['misc']) - self.assertCountEqual(Note.objects.filter(misc='test_note'), notes) + note.misc = F("note") + Note.objects.bulk_update(notes, ["misc"]) + self.assertCountEqual(Note.objects.filter(misc="test_note"), notes) def test_booleanfield(self): individuals = [Individual.objects.create(alive=False) for _ in range(10)] for individual in individuals: individual.alive = True - Individual.objects.bulk_update(individuals, ['alive']) + Individual.objects.bulk_update(individuals, ["alive"]) self.assertCountEqual(Individual.objects.filter(alive=True), individuals) def test_ipaddressfield(self): - for ip in ('2001::1', '1.2.3.4'): + for ip in ("2001::1", "1.2.3.4"): with self.subTest(ip=ip): models = [ - CustomDbColumn.objects.create(ip_address='0.0.0.0') + CustomDbColumn.objects.create(ip_address="0.0.0.0") for _ in range(10) ] for model in models: model.ip_address = ip - CustomDbColumn.objects.bulk_update(models, ['ip_address']) - self.assertCountEqual(CustomDbColumn.objects.filter(ip_address=ip), models) + CustomDbColumn.objects.bulk_update(models, ["ip_address"]) + self.assertCountEqual( + CustomDbColumn.objects.filter(ip_address=ip), models + ) def test_datetime_field(self): articles = [ @@ -248,26 +258,28 @@ class BulkUpdateTests(TestCase): point_in_time = datetime.datetime(1991, 10, 31) for article in articles: article.created = point_in_time - Article.objects.bulk_update(articles, ['created']) + Article.objects.bulk_update(articles, ["created"]) self.assertCountEqual(Article.objects.filter(created=point_in_time), articles) - @skipUnlessDBFeature('supports_json_field') + @skipUnlessDBFeature("supports_json_field") def test_json_field(self): - JSONFieldNullable.objects.bulk_create([ - JSONFieldNullable(json_field={'a': i}) for i in range(10) - ]) + JSONFieldNullable.objects.bulk_create( + [JSONFieldNullable(json_field={"a": i}) for i in range(10)] + ) objs = JSONFieldNullable.objects.all() for obj in objs: - obj.json_field = {'c': obj.json_field['a'] + 1} - JSONFieldNullable.objects.bulk_update(objs, ['json_field']) - self.assertCountEqual(JSONFieldNullable.objects.filter(json_field__has_key='c'), objs) + obj.json_field = {"c": obj.json_field["a"] + 1} + JSONFieldNullable.objects.bulk_update(objs, ["json_field"]) + self.assertCountEqual( + JSONFieldNullable.objects.filter(json_field__has_key="c"), objs + ) def test_nullable_fk_after_related_save(self): parent = RelatedObject.objects.create() child = SingleObject() parent.single = child parent.single.save() - RelatedObject.objects.bulk_update([parent], fields=['single']) + RelatedObject.objects.bulk_update([parent], fields=["single"]) self.assertEqual(parent.single_id, parent.single.pk) parent.refresh_from_db() self.assertEqual(parent.single, child) @@ -280,13 +292,13 @@ class BulkUpdateTests(TestCase): "related object 'single'." ) with self.assertRaisesMessage(ValueError, msg): - RelatedObject.objects.bulk_update([parent], fields=['single']) + RelatedObject.objects.bulk_update([parent], fields=["single"]) def test_unspecified_unsaved_parent(self): parent = RelatedObject.objects.create() parent.single = SingleObject() parent.f = 42 - RelatedObject.objects.bulk_update([parent], fields=['f']) + RelatedObject.objects.bulk_update([parent], fields=["f"]) parent.refresh_from_db() self.assertEqual(parent.f, 42) self.assertIsNone(parent.single) diff --git a/tests/queries/test_contains.py b/tests/queries/test_contains.py index a58dbe180f..2aa4badc72 100644 --- a/tests/queries/test_contains.py +++ b/tests/queries/test_contains.py @@ -10,7 +10,7 @@ class ContainsTests(TestCase): cls.proxy_category = ProxyCategory.objects.create() def test_unsaved_obj(self): - msg = 'QuerySet.contains() cannot be used on unsaved objects.' + msg = "QuerySet.contains() cannot be used on unsaved objects." with self.assertRaisesMessage(ValueError, msg): DumbCategory.objects.contains(DumbCategory()) @@ -20,11 +20,11 @@ class ContainsTests(TestCase): DumbCategory.objects.contains(object()) def test_values(self): - msg = 'Cannot call QuerySet.contains() after .values() or .values_list().' + msg = "Cannot call QuerySet.contains() after .values() or .values_list()." with self.assertRaisesMessage(TypeError, msg): - DumbCategory.objects.values_list('pk').contains(self.category) + DumbCategory.objects.values_list("pk").contains(self.category) with self.assertRaisesMessage(TypeError, msg): - DumbCategory.objects.values('pk').contains(self.category) + DumbCategory.objects.values("pk").contains(self.category) def test_basic(self): with self.assertNumQueries(1): @@ -53,7 +53,7 @@ class ContainsTests(TestCase): def test_wrong_model(self): qs = DumbCategory.objects.all() - named_category = NamedCategory(name='category') + named_category = NamedCategory(name="category") with self.assertNumQueries(0): self.assertIs(qs.contains(named_category), False) # Evaluate the queryset. diff --git a/tests/queries/test_db_returning.py b/tests/queries/test_db_returning.py index 9ba352a7ab..50c164a57f 100644 --- a/tests/queries/test_db_returning.py +++ b/tests/queries/test_db_returning.py @@ -7,17 +7,18 @@ from django.test.utils import CaptureQueriesContext from .models import DumbCategory, NonIntegerPKReturningModel, ReturningModel -@skipUnlessDBFeature('can_return_columns_from_insert') +@skipUnlessDBFeature("can_return_columns_from_insert") class ReturningValuesTests(TestCase): def test_insert_returning(self): with CaptureQueriesContext(connection) as captured_queries: DumbCategory.objects.create() self.assertIn( - 'RETURNING %s.%s' % ( + "RETURNING %s.%s" + % ( connection.ops.quote_name(DumbCategory._meta.db_table), - connection.ops.quote_name(DumbCategory._meta.get_field('id').column), + connection.ops.quote_name(DumbCategory._meta.get_field("id").column), ), - captured_queries[-1]['sql'], + captured_queries[-1]["sql"], ) def test_insert_returning_non_integer(self): @@ -30,20 +31,23 @@ class ReturningValuesTests(TestCase): obj = ReturningModel.objects.create() table_name = connection.ops.quote_name(ReturningModel._meta.db_table) self.assertIn( - 'RETURNING %s.%s, %s.%s' % ( + "RETURNING %s.%s, %s.%s" + % ( table_name, - connection.ops.quote_name(ReturningModel._meta.get_field('id').column), + connection.ops.quote_name(ReturningModel._meta.get_field("id").column), table_name, - connection.ops.quote_name(ReturningModel._meta.get_field('created').column), + connection.ops.quote_name( + ReturningModel._meta.get_field("created").column + ), ), - captured_queries[-1]['sql'], + captured_queries[-1]["sql"], ) self.assertTrue(obj.pk) self.assertIsInstance(obj.created, datetime.datetime) - @skipUnlessDBFeature('can_return_rows_from_bulk_insert') + @skipUnlessDBFeature("can_return_rows_from_bulk_insert") def test_bulk_insert(self): - objs = [ReturningModel(), ReturningModel(pk=2 ** 11), ReturningModel()] + objs = [ReturningModel(), ReturningModel(pk=2**11), ReturningModel()] ReturningModel.objects.bulk_create(objs) for obj in objs: with self.subTest(obj=obj): diff --git a/tests/queries/test_explain.py b/tests/queries/test_explain.py index f1108c062f..2acd6a685c 100644 --- a/tests/queries/test_explain.py +++ b/tests/queries/test_explain.py @@ -10,110 +10,122 @@ from django.test.utils import CaptureQueriesContext from .models import Tag -@skipUnlessDBFeature('supports_explaining_query_execution') +@skipUnlessDBFeature("supports_explaining_query_execution") class ExplainTests(TestCase): - def test_basic(self): querysets = [ - Tag.objects.filter(name='test'), - Tag.objects.filter(name='test').select_related('parent'), - Tag.objects.filter(name='test').prefetch_related('children'), - Tag.objects.filter(name='test').annotate(Count('children')), - Tag.objects.filter(name='test').values_list('name'), - Tag.objects.order_by().union(Tag.objects.order_by().filter(name='test')), - Tag.objects.all().select_for_update().filter(name='test'), + Tag.objects.filter(name="test"), + Tag.objects.filter(name="test").select_related("parent"), + Tag.objects.filter(name="test").prefetch_related("children"), + Tag.objects.filter(name="test").annotate(Count("children")), + Tag.objects.filter(name="test").values_list("name"), + Tag.objects.order_by().union(Tag.objects.order_by().filter(name="test")), + Tag.objects.all().select_for_update().filter(name="test"), ] supported_formats = connection.features.supported_explain_formats - all_formats = (None,) + tuple(supported_formats) + tuple(f.lower() for f in supported_formats) + all_formats = ( + (None,) + + tuple(supported_formats) + + tuple(f.lower() for f in supported_formats) + ) for idx, queryset in enumerate(querysets): for format in all_formats: with self.subTest(format=format, queryset=idx): - with self.assertNumQueries(1), CaptureQueriesContext(connection) as captured_queries: + with self.assertNumQueries(1), CaptureQueriesContext( + connection + ) as captured_queries: result = queryset.explain(format=format) - self.assertTrue(captured_queries[0]['sql'].startswith(connection.ops.explain_prefix)) + self.assertTrue( + captured_queries[0]["sql"].startswith( + connection.ops.explain_prefix + ) + ) self.assertIsInstance(result, str) self.assertTrue(result) - if format == 'xml': + if format == "xml": try: xml.etree.ElementTree.fromstring(result) except xml.etree.ElementTree.ParseError as e: self.fail( - f'QuerySet.explain() result is not valid XML: {e}' + f"QuerySet.explain() result is not valid XML: {e}" ) - elif format == 'json': + elif format == "json": try: json.loads(result) except json.JSONDecodeError as e: self.fail( - f'QuerySet.explain() result is not valid JSON: {e}' + f"QuerySet.explain() result is not valid JSON: {e}" ) - @skipUnlessDBFeature('validates_explain_options') + @skipUnlessDBFeature("validates_explain_options") def test_unknown_options(self): - with self.assertRaisesMessage(ValueError, 'Unknown options: test, test2'): + with self.assertRaisesMessage(ValueError, "Unknown options: test, test2"): Tag.objects.all().explain(test=1, test2=1) def test_unknown_format(self): - msg = 'DOES NOT EXIST is not a recognized format.' + msg = "DOES NOT EXIST is not a recognized format." if connection.features.supported_explain_formats: - msg += ' Allowed formats: %s' % ', '.join(sorted(connection.features.supported_explain_formats)) + msg += " Allowed formats: %s" % ", ".join( + sorted(connection.features.supported_explain_formats) + ) with self.assertRaisesMessage(ValueError, msg): - Tag.objects.all().explain(format='does not exist') + Tag.objects.all().explain(format="does not exist") - @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific') + @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific") def test_postgres_options(self): - qs = Tag.objects.filter(name='test') + qs = Tag.objects.filter(name="test") test_options = [ - {'COSTS': False, 'BUFFERS': True, 'ANALYZE': True}, - {'costs': False, 'buffers': True, 'analyze': True}, - {'verbose': True, 'timing': True, 'analyze': True}, - {'verbose': False, 'timing': False, 'analyze': True}, - {'summary': True}, + {"COSTS": False, "BUFFERS": True, "ANALYZE": True}, + {"costs": False, "buffers": True, "analyze": True}, + {"verbose": True, "timing": True, "analyze": True}, + {"verbose": False, "timing": False, "analyze": True}, + {"summary": True}, ] if connection.features.is_postgresql_12: - test_options.append({'settings': True}) + test_options.append({"settings": True}) if connection.features.is_postgresql_13: - test_options.append({'analyze': True, 'wal': True}) + test_options.append({"analyze": True, "wal": True}) for options in test_options: with self.subTest(**options), transaction.atomic(): with CaptureQueriesContext(connection) as captured_queries: - qs.explain(format='text', **options) + qs.explain(format="text", **options) self.assertEqual(len(captured_queries), 1) for name, value in options.items(): - option = '{} {}'.format(name.upper(), 'true' if value else 'false') - self.assertIn(option, captured_queries[0]['sql']) + option = "{} {}".format(name.upper(), "true" if value else "false") + self.assertIn(option, captured_queries[0]["sql"]) - @unittest.skipUnless(connection.vendor == 'mysql', 'MySQL specific') + @unittest.skipUnless(connection.vendor == "mysql", "MySQL specific") def test_mysql_text_to_traditional(self): # Ensure these cached properties are initialized to prevent queries for # the MariaDB or MySQL version during the QuerySet evaluation. connection.features.supported_explain_formats with CaptureQueriesContext(connection) as captured_queries: - Tag.objects.filter(name='test').explain(format='text') + Tag.objects.filter(name="test").explain(format="text") self.assertEqual(len(captured_queries), 1) - self.assertIn('FORMAT=TRADITIONAL', captured_queries[0]['sql']) + self.assertIn("FORMAT=TRADITIONAL", captured_queries[0]["sql"]) - @unittest.skipUnless(connection.vendor == 'mysql', 'MariaDB and MySQL >= 8.0.18 specific.') + @unittest.skipUnless( + connection.vendor == "mysql", "MariaDB and MySQL >= 8.0.18 specific." + ) def test_mysql_analyze(self): - qs = Tag.objects.filter(name='test') + qs = Tag.objects.filter(name="test") with CaptureQueriesContext(connection) as captured_queries: qs.explain(analyze=True) self.assertEqual(len(captured_queries), 1) - prefix = 'ANALYZE ' if connection.mysql_is_mariadb else 'EXPLAIN ANALYZE ' - self.assertTrue(captured_queries[0]['sql'].startswith(prefix)) + prefix = "ANALYZE " if connection.mysql_is_mariadb else "EXPLAIN ANALYZE " + self.assertTrue(captured_queries[0]["sql"].startswith(prefix)) with CaptureQueriesContext(connection) as captured_queries: - qs.explain(analyze=True, format='JSON') + qs.explain(analyze=True, format="JSON") self.assertEqual(len(captured_queries), 1) if connection.mysql_is_mariadb: - self.assertIn('FORMAT=JSON', captured_queries[0]['sql']) + self.assertIn("FORMAT=JSON", captured_queries[0]["sql"]) else: - self.assertNotIn('FORMAT=JSON', captured_queries[0]['sql']) + self.assertNotIn("FORMAT=JSON", captured_queries[0]["sql"]) -@skipIfDBFeature('supports_explaining_query_execution') +@skipIfDBFeature("supports_explaining_query_execution") class ExplainUnsupportedTests(TestCase): - def test_message(self): - msg = 'This backend does not support explaining query execution.' + msg = "This backend does not support explaining query execution." with self.assertRaisesMessage(NotSupportedError, msg): - Tag.objects.filter(name='test').explain() + Tag.objects.filter(name="test").explain() diff --git a/tests/queries/test_iterator.py b/tests/queries/test_iterator.py index 7fc37b00a1..558bd15c02 100644 --- a/tests/queries/test_iterator.py +++ b/tests/queries/test_iterator.py @@ -13,18 +13,22 @@ class QuerySetIteratorTests(TestCase): @classmethod def setUpTestData(cls): - Article.objects.create(name='Article 1', created=datetime.datetime.now()) - Article.objects.create(name='Article 2', created=datetime.datetime.now()) + Article.objects.create(name="Article 1", created=datetime.datetime.now()) + Article.objects.create(name="Article 2", created=datetime.datetime.now()) def test_iterator_invalid_chunk_size(self): for size in (0, -1): with self.subTest(size=size): - with self.assertRaisesMessage(ValueError, 'Chunk size must be strictly positive.'): + with self.assertRaisesMessage( + ValueError, "Chunk size must be strictly positive." + ): Article.objects.iterator(chunk_size=size) def test_default_iterator_chunk_size(self): qs = Article.objects.iterator() - with mock.patch('django.db.models.sql.compiler.cursor_iter', side_effect=cursor_iter) as cursor_iter_mock: + with mock.patch( + "django.db.models.sql.compiler.cursor_iter", side_effect=cursor_iter + ) as cursor_iter_mock: next(qs) self.assertEqual(cursor_iter_mock.call_count, 1) mock_args, _mock_kwargs = cursor_iter_mock.call_args @@ -33,7 +37,9 @@ class QuerySetIteratorTests(TestCase): def test_iterator_chunk_size(self): batch_size = 3 qs = Article.objects.iterator(chunk_size=batch_size) - with mock.patch('django.db.models.sql.compiler.cursor_iter', side_effect=cursor_iter) as cursor_iter_mock: + with mock.patch( + "django.db.models.sql.compiler.cursor_iter", side_effect=cursor_iter + ) as cursor_iter_mock: next(qs) self.assertEqual(cursor_iter_mock.call_count, 1) mock_args, _mock_kwargs = cursor_iter_mock.call_args @@ -47,6 +53,6 @@ class QuerySetIteratorTests(TestCase): qs = Article.objects.all() compiler = qs.query.get_compiler(using=qs.db) features = connections[qs.db].features - with mock.patch.object(features, 'can_use_chunked_reads', False): + with mock.patch.object(features, "can_use_chunked_reads", False): result = compiler.execute_sql(chunked_fetch=True) self.assertIsInstance(result, list) diff --git a/tests/queries/test_q.py b/tests/queries/test_q.py index bd39b2dc32..b1dc45be13 100644 --- a/tests/queries/test_q.py +++ b/tests/queries/test_q.py @@ -52,7 +52,7 @@ class QTests(SimpleTestCase): q & obj def test_combine_negated_boolean_expression(self): - tagged = Tag.objects.filter(category=OuterRef('pk')) + tagged = Tag.objects.filter(category=OuterRef("pk")) tests = [ Q() & ~Exists(tagged), Q() | ~Exists(tagged), @@ -62,82 +62,91 @@ class QTests(SimpleTestCase): self.assertIs(q.negated, True) def test_deconstruct(self): - q = Q(price__gt=F('discounted_price')) + q = Q(price__gt=F("discounted_price")) path, args, kwargs = q.deconstruct() - self.assertEqual(path, 'django.db.models.Q') - self.assertEqual(args, (('price__gt', F('discounted_price')),)) + self.assertEqual(path, "django.db.models.Q") + self.assertEqual(args, (("price__gt", F("discounted_price")),)) self.assertEqual(kwargs, {}) def test_deconstruct_negated(self): - q = ~Q(price__gt=F('discounted_price')) + q = ~Q(price__gt=F("discounted_price")) path, args, kwargs = q.deconstruct() - self.assertEqual(args, (('price__gt', F('discounted_price')),)) - self.assertEqual(kwargs, {'_negated': True}) + self.assertEqual(args, (("price__gt", F("discounted_price")),)) + self.assertEqual(kwargs, {"_negated": True}) def test_deconstruct_or(self): - q1 = Q(price__gt=F('discounted_price')) - q2 = Q(price=F('discounted_price')) + q1 = Q(price__gt=F("discounted_price")) + q2 = Q(price=F("discounted_price")) q = q1 | q2 path, args, kwargs = q.deconstruct() - self.assertEqual(args, ( - ('price__gt', F('discounted_price')), - ('price', F('discounted_price')), - )) - self.assertEqual(kwargs, {'_connector': 'OR'}) + self.assertEqual( + args, + ( + ("price__gt", F("discounted_price")), + ("price", F("discounted_price")), + ), + ) + self.assertEqual(kwargs, {"_connector": "OR"}) def test_deconstruct_and(self): - q1 = Q(price__gt=F('discounted_price')) - q2 = Q(price=F('discounted_price')) + q1 = Q(price__gt=F("discounted_price")) + q2 = Q(price=F("discounted_price")) q = q1 & q2 path, args, kwargs = q.deconstruct() - self.assertEqual(args, ( - ('price__gt', F('discounted_price')), - ('price', F('discounted_price')), - )) + self.assertEqual( + args, + ( + ("price__gt", F("discounted_price")), + ("price", F("discounted_price")), + ), + ) self.assertEqual(kwargs, {}) def test_deconstruct_multiple_kwargs(self): - q = Q(price__gt=F('discounted_price'), price=F('discounted_price')) + q = Q(price__gt=F("discounted_price"), price=F("discounted_price")) path, args, kwargs = q.deconstruct() - self.assertEqual(args, ( - ('price', F('discounted_price')), - ('price__gt', F('discounted_price')), - )) + self.assertEqual( + args, + ( + ("price", F("discounted_price")), + ("price__gt", F("discounted_price")), + ), + ) self.assertEqual(kwargs, {}) def test_deconstruct_nested(self): - q = Q(Q(price__gt=F('discounted_price'))) + q = Q(Q(price__gt=F("discounted_price"))) path, args, kwargs = q.deconstruct() - self.assertEqual(args, (Q(price__gt=F('discounted_price')),)) + self.assertEqual(args, (Q(price__gt=F("discounted_price")),)) self.assertEqual(kwargs, {}) def test_deconstruct_boolean_expression(self): - expr = RawSQL('1 = 1', BooleanField()) + expr = RawSQL("1 = 1", BooleanField()) q = Q(expr) _, args, kwargs = q.deconstruct() self.assertEqual(args, (expr,)) self.assertEqual(kwargs, {}) def test_reconstruct(self): - q = Q(price__gt=F('discounted_price')) + q = Q(price__gt=F("discounted_price")) path, args, kwargs = q.deconstruct() self.assertEqual(Q(*args, **kwargs), q) def test_reconstruct_negated(self): - q = ~Q(price__gt=F('discounted_price')) + q = ~Q(price__gt=F("discounted_price")) path, args, kwargs = q.deconstruct() self.assertEqual(Q(*args, **kwargs), q) def test_reconstruct_or(self): - q1 = Q(price__gt=F('discounted_price')) - q2 = Q(price=F('discounted_price')) + q1 = Q(price__gt=F("discounted_price")) + q2 = Q(price=F("discounted_price")) q = q1 | q2 path, args, kwargs = q.deconstruct() self.assertEqual(Q(*args, **kwargs), q) def test_reconstruct_and(self): - q1 = Q(price__gt=F('discounted_price')) - q2 = Q(price=F('discounted_price')) + q1 = Q(price__gt=F("discounted_price")) + q2 = Q(price=F("discounted_price")) q = q1 & q2 path, args, kwargs = q.deconstruct() self.assertEqual(Q(*args, **kwargs), q) diff --git a/tests/queries/test_qs_combinators.py b/tests/queries/test_qs_combinators.py index 37ce37c2c1..000185bbe6 100644 --- a/tests/queries/test_qs_combinators.py +++ b/tests/queries/test_qs_combinators.py @@ -8,14 +8,16 @@ from django.test.utils import CaptureQueriesContext from .models import Author, Celebrity, ExtraInfo, Number, ReservedName -@skipUnlessDBFeature('supports_select_union') +@skipUnlessDBFeature("supports_select_union") class QuerySetSetOperationTests(TestCase): @classmethod def setUpTestData(cls): Number.objects.bulk_create(Number(num=i, other_num=10 - i) for i in range(10)) def assertNumbersEqual(self, queryset, expected_numbers, ordered=True): - self.assertQuerysetEqual(queryset, expected_numbers, operator.attrgetter('num'), ordered) + self.assertQuerysetEqual( + queryset, expected_numbers, operator.attrgetter("num"), ordered + ) def test_simple_union(self): qs1 = Number.objects.filter(num__lte=1) @@ -23,24 +25,24 @@ class QuerySetSetOperationTests(TestCase): qs3 = Number.objects.filter(num=5) self.assertNumbersEqual(qs1.union(qs2, qs3), [0, 1, 5, 8, 9], ordered=False) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_simple_intersection(self): qs1 = Number.objects.filter(num__lte=5) qs2 = Number.objects.filter(num__gte=5) qs3 = Number.objects.filter(num__gte=4, num__lte=6) self.assertNumbersEqual(qs1.intersection(qs2, qs3), [5], ordered=False) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_intersection_with_values(self): - ReservedName.objects.create(name='a', order=2) + ReservedName.objects.create(name="a", order=2) qs1 = ReservedName.objects.all() - reserved_name = qs1.intersection(qs1).values('name', 'order', 'id').get() - self.assertEqual(reserved_name['name'], 'a') - self.assertEqual(reserved_name['order'], 2) - reserved_name = qs1.intersection(qs1).values_list('name', 'order', 'id').get() - self.assertEqual(reserved_name[:2], ('a', 2)) + reserved_name = qs1.intersection(qs1).values("name", "order", "id").get() + self.assertEqual(reserved_name["name"], "a") + self.assertEqual(reserved_name["order"], 2) + reserved_name = qs1.intersection(qs1).values_list("name", "order", "id").get() + self.assertEqual(reserved_name[:2], ("a", 2)) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_simple_difference(self): qs1 = Number.objects.filter(num__lte=5) qs2 = Number.objects.filter(num__lte=4) @@ -59,7 +61,7 @@ class QuerySetSetOperationTests(TestCase): self.assertSequenceEqual(qs3.none(), []) self.assertNumbersEqual(qs3, [0, 1, 8, 9], ordered=False) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_intersection_with_empty_qs(self): qs1 = Number.objects.all() qs2 = Number.objects.none() @@ -71,7 +73,7 @@ class QuerySetSetOperationTests(TestCase): self.assertEqual(len(qs2.intersection(qs2)), 0) self.assertEqual(len(qs3.intersection(qs3)), 0) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_difference_with_empty_qs(self): qs1 = Number.objects.all() qs2 = Number.objects.none() @@ -83,16 +85,16 @@ class QuerySetSetOperationTests(TestCase): self.assertEqual(len(qs2.difference(qs2)), 0) self.assertEqual(len(qs3.difference(qs3)), 0) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_difference_with_values(self): - ReservedName.objects.create(name='a', order=2) + ReservedName.objects.create(name="a", order=2) qs1 = ReservedName.objects.all() qs2 = ReservedName.objects.none() - reserved_name = qs1.difference(qs2).values('name', 'order', 'id').get() - self.assertEqual(reserved_name['name'], 'a') - self.assertEqual(reserved_name['order'], 2) - reserved_name = qs1.difference(qs2).values_list('name', 'order', 'id').get() - self.assertEqual(reserved_name[:2], ('a', 2)) + reserved_name = qs1.difference(qs2).values("name", "order", "id").get() + self.assertEqual(reserved_name["name"], "a") + self.assertEqual(reserved_name["order"], 2) + reserved_name = qs1.difference(qs2).values_list("name", "order", "id").get() + self.assertEqual(reserved_name[:2], ("a", 2)) def test_union_with_empty_qs(self): qs1 = Number.objects.all() @@ -108,8 +110,8 @@ class QuerySetSetOperationTests(TestCase): self.assertEqual(len(qs3.union(qs3)), 0) def test_empty_qs_union_with_ordered_qs(self): - qs1 = Number.objects.all().order_by('num') - qs2 = Number.objects.none().union(qs1).order_by('num') + qs1 = Number.objects.all().order_by("num") + qs2 = Number.objects.none().union(qs1).order_by("num") self.assertEqual(list(qs1), list(qs2)) def test_limits(self): @@ -120,94 +122,116 @@ class QuerySetSetOperationTests(TestCase): def test_ordering(self): qs1 = Number.objects.filter(num__lte=1) qs2 = Number.objects.filter(num__gte=2, num__lte=3) - self.assertNumbersEqual(qs1.union(qs2).order_by('-num'), [3, 2, 1, 0]) + self.assertNumbersEqual(qs1.union(qs2).order_by("-num"), [3, 2, 1, 0]) def test_ordering_by_alias(self): - qs1 = Number.objects.filter(num__lte=1).values(alias=F('num')) - qs2 = Number.objects.filter(num__gte=2, num__lte=3).values(alias=F('num')) + qs1 = Number.objects.filter(num__lte=1).values(alias=F("num")) + qs2 = Number.objects.filter(num__gte=2, num__lte=3).values(alias=F("num")) self.assertQuerysetEqual( - qs1.union(qs2).order_by('-alias'), + qs1.union(qs2).order_by("-alias"), [3, 2, 1, 0], - operator.itemgetter('alias'), + operator.itemgetter("alias"), ) def test_ordering_by_f_expression(self): qs1 = Number.objects.filter(num__lte=1) qs2 = Number.objects.filter(num__gte=2, num__lte=3) - self.assertNumbersEqual(qs1.union(qs2).order_by(F('num').desc()), [3, 2, 1, 0]) + self.assertNumbersEqual(qs1.union(qs2).order_by(F("num").desc()), [3, 2, 1, 0]) def test_ordering_by_f_expression_and_alias(self): - qs1 = Number.objects.filter(num__lte=1).values(alias=F('other_num')) - qs2 = Number.objects.filter(num__gte=2, num__lte=3).values(alias=F('other_num')) + qs1 = Number.objects.filter(num__lte=1).values(alias=F("other_num")) + qs2 = Number.objects.filter(num__gte=2, num__lte=3).values(alias=F("other_num")) self.assertQuerysetEqual( - qs1.union(qs2).order_by(F('alias').desc()), + qs1.union(qs2).order_by(F("alias").desc()), [10, 9, 8, 7], - operator.itemgetter('alias'), + operator.itemgetter("alias"), ) Number.objects.create(num=-1) self.assertQuerysetEqual( - qs1.union(qs2).order_by(F('alias').desc(nulls_last=True)), + qs1.union(qs2).order_by(F("alias").desc(nulls_last=True)), [10, 9, 8, 7, None], - operator.itemgetter('alias'), + operator.itemgetter("alias"), ) def test_union_with_values(self): - ReservedName.objects.create(name='a', order=2) + ReservedName.objects.create(name="a", order=2) qs1 = ReservedName.objects.all() - reserved_name = qs1.union(qs1).values('name', 'order', 'id').get() - self.assertEqual(reserved_name['name'], 'a') - self.assertEqual(reserved_name['order'], 2) - reserved_name = qs1.union(qs1).values_list('name', 'order', 'id').get() - self.assertEqual(reserved_name[:2], ('a', 2)) + reserved_name = qs1.union(qs1).values("name", "order", "id").get() + self.assertEqual(reserved_name["name"], "a") + self.assertEqual(reserved_name["order"], 2) + reserved_name = qs1.union(qs1).values_list("name", "order", "id").get() + self.assertEqual(reserved_name[:2], ("a", 2)) # List of columns can be changed. - reserved_name = qs1.union(qs1).values_list('order').get() + reserved_name = qs1.union(qs1).values_list("order").get() self.assertEqual(reserved_name, (2,)) def test_union_with_two_annotated_values_list(self): - qs1 = Number.objects.filter(num=1).annotate( - count=Value(0, IntegerField()), - ).values_list('num', 'count') - qs2 = Number.objects.filter(num=2).values('pk').annotate( - count=F('num'), - ).annotate( - num=Value(1, IntegerField()), - ).values_list('num', 'count') + qs1 = ( + Number.objects.filter(num=1) + .annotate( + count=Value(0, IntegerField()), + ) + .values_list("num", "count") + ) + qs2 = ( + Number.objects.filter(num=2) + .values("pk") + .annotate( + count=F("num"), + ) + .annotate( + num=Value(1, IntegerField()), + ) + .values_list("num", "count") + ) self.assertCountEqual(qs1.union(qs2), [(1, 0), (2, 1)]) def test_union_with_extra_and_values_list(self): - qs1 = Number.objects.filter(num=1).extra( - select={'count': 0}, - ).values_list('num', 'count') - qs2 = Number.objects.filter(num=2).extra(select={'count': 1}) + qs1 = ( + Number.objects.filter(num=1) + .extra( + select={"count": 0}, + ) + .values_list("num", "count") + ) + qs2 = Number.objects.filter(num=2).extra(select={"count": 1}) self.assertCountEqual(qs1.union(qs2), [(1, 0), (2, 1)]) def test_union_with_values_list_on_annotated_and_unannotated(self): - ReservedName.objects.create(name='rn1', order=1) + ReservedName.objects.create(name="rn1", order=1) qs1 = Number.objects.annotate( - has_reserved_name=Exists(ReservedName.objects.filter(order=OuterRef('num'))) + has_reserved_name=Exists(ReservedName.objects.filter(order=OuterRef("num"))) ).filter(has_reserved_name=True) qs2 = Number.objects.filter(num=9) - self.assertCountEqual(qs1.union(qs2).values_list('num', flat=True), [1, 9]) + self.assertCountEqual(qs1.union(qs2).values_list("num", flat=True), [1, 9]) def test_union_with_values_list_and_order(self): - ReservedName.objects.bulk_create([ - ReservedName(name='rn1', order=7), - ReservedName(name='rn2', order=5), - ReservedName(name='rn0', order=6), - ReservedName(name='rn9', order=-1), - ]) + ReservedName.objects.bulk_create( + [ + ReservedName(name="rn1", order=7), + ReservedName(name="rn2", order=5), + ReservedName(name="rn0", order=6), + ReservedName(name="rn9", order=-1), + ] + ) qs1 = ReservedName.objects.filter(order__gte=6) qs2 = ReservedName.objects.filter(order__lte=5) union_qs = qs1.union(qs2) for qs, expected_result in ( # Order by a single column. - (union_qs.order_by('-pk').values_list('order', flat=True), [-1, 6, 5, 7]), - (union_qs.order_by('pk').values_list('order', flat=True), [7, 5, 6, -1]), - (union_qs.values_list('order', flat=True).order_by('-pk'), [-1, 6, 5, 7]), - (union_qs.values_list('order', flat=True).order_by('pk'), [7, 5, 6, -1]), + (union_qs.order_by("-pk").values_list("order", flat=True), [-1, 6, 5, 7]), + (union_qs.order_by("pk").values_list("order", flat=True), [7, 5, 6, -1]), + (union_qs.values_list("order", flat=True).order_by("-pk"), [-1, 6, 5, 7]), + (union_qs.values_list("order", flat=True).order_by("pk"), [7, 5, 6, -1]), # Order by multiple columns. - (union_qs.order_by('-name', 'pk').values_list('order', flat=True), [-1, 5, 7, 6]), - (union_qs.values_list('order', flat=True).order_by('-name', 'pk'), [-1, 5, 7, 6]), + ( + union_qs.order_by("-name", "pk").values_list("order", flat=True), + [-1, 5, 7, 6], + ), + ( + union_qs.values_list("order", flat=True).order_by("-name", "pk"), + [-1, 5, 7, 6], + ), ): with self.subTest(qs=qs): self.assertEqual(list(qs), expected_result) @@ -215,92 +239,104 @@ class QuerySetSetOperationTests(TestCase): def test_union_with_values_list_and_order_on_annotation(self): qs1 = Number.objects.annotate( annotation=Value(-1), - multiplier=F('annotation'), + multiplier=F("annotation"), ).filter(num__gte=6) qs2 = Number.objects.annotate( annotation=Value(2), - multiplier=F('annotation'), + multiplier=F("annotation"), ).filter(num__lte=5) self.assertSequenceEqual( - qs1.union(qs2).order_by('annotation', 'num').values_list('num', flat=True), + qs1.union(qs2).order_by("annotation", "num").values_list("num", flat=True), [6, 7, 8, 9, 0, 1, 2, 3, 4, 5], ) self.assertQuerysetEqual( - qs1.union(qs2).order_by( - F('annotation') * F('multiplier'), - 'num', - ).values('num'), + qs1.union(qs2) + .order_by( + F("annotation") * F("multiplier"), + "num", + ) + .values("num"), [6, 7, 8, 9, 0, 1, 2, 3, 4, 5], - operator.itemgetter('num'), + operator.itemgetter("num"), ) def test_union_multiple_models_with_values_list_and_order(self): - reserved_name = ReservedName.objects.create(name='rn1', order=0) + reserved_name = ReservedName.objects.create(name="rn1", order=0) qs1 = Celebrity.objects.all() qs2 = ReservedName.objects.all() self.assertSequenceEqual( - qs1.union(qs2).order_by('name').values_list('pk', flat=True), + qs1.union(qs2).order_by("name").values_list("pk", flat=True), [reserved_name.pk], ) def test_union_multiple_models_with_values_list_and_order_by_extra_select(self): - reserved_name = ReservedName.objects.create(name='rn1', order=0) - qs1 = Celebrity.objects.extra(select={'extra_name': 'name'}) - qs2 = ReservedName.objects.extra(select={'extra_name': 'name'}) + reserved_name = ReservedName.objects.create(name="rn1", order=0) + qs1 = Celebrity.objects.extra(select={"extra_name": "name"}) + qs2 = ReservedName.objects.extra(select={"extra_name": "name"}) self.assertSequenceEqual( - qs1.union(qs2).order_by('extra_name').values_list('pk', flat=True), + qs1.union(qs2).order_by("extra_name").values_list("pk", flat=True), [reserved_name.pk], ) def test_union_in_subquery(self): - ReservedName.objects.bulk_create([ - ReservedName(name='rn1', order=8), - ReservedName(name='rn2', order=1), - ReservedName(name='rn3', order=5), - ]) - qs1 = Number.objects.filter(num__gt=7, num=OuterRef('order')) - qs2 = Number.objects.filter(num__lt=2, num=OuterRef('order')) + ReservedName.objects.bulk_create( + [ + ReservedName(name="rn1", order=8), + ReservedName(name="rn2", order=1), + ReservedName(name="rn3", order=5), + ] + ) + qs1 = Number.objects.filter(num__gt=7, num=OuterRef("order")) + qs2 = Number.objects.filter(num__lt=2, num=OuterRef("order")) self.assertCountEqual( ReservedName.objects.annotate( - number=Subquery(qs1.union(qs2).values('num')), - ).filter(number__isnull=False).values_list('order', flat=True), + number=Subquery(qs1.union(qs2).values("num")), + ) + .filter(number__isnull=False) + .values_list("order", flat=True), [8, 1], ) def test_union_in_subquery_related_outerref(self): - e1 = ExtraInfo.objects.create(value=7, info='e3') - e2 = ExtraInfo.objects.create(value=5, info='e2') - e3 = ExtraInfo.objects.create(value=1, info='e1') - Author.objects.bulk_create([ - Author(name='a1', num=1, extra=e1), - Author(name='a2', num=3, extra=e2), - Author(name='a3', num=2, extra=e3), - ]) - qs1 = ExtraInfo.objects.order_by().filter(value=OuterRef('num')) - qs2 = ExtraInfo.objects.order_by().filter(value__lt=OuterRef('extra__value')) - qs = Author.objects.annotate( - info=Subquery(qs1.union(qs2).values('info')[:1]), - ).filter(info__isnull=False).values_list('name', flat=True) - self.assertCountEqual(qs, ['a1', 'a2']) + e1 = ExtraInfo.objects.create(value=7, info="e3") + e2 = ExtraInfo.objects.create(value=5, info="e2") + e3 = ExtraInfo.objects.create(value=1, info="e1") + Author.objects.bulk_create( + [ + Author(name="a1", num=1, extra=e1), + Author(name="a2", num=3, extra=e2), + Author(name="a3", num=2, extra=e3), + ] + ) + qs1 = ExtraInfo.objects.order_by().filter(value=OuterRef("num")) + qs2 = ExtraInfo.objects.order_by().filter(value__lt=OuterRef("extra__value")) + qs = ( + Author.objects.annotate( + info=Subquery(qs1.union(qs2).values("info")[:1]), + ) + .filter(info__isnull=False) + .values_list("name", flat=True) + ) + self.assertCountEqual(qs, ["a1", "a2"]) # Combined queries don't mutate. - self.assertCountEqual(qs, ['a1', 'a2']) + self.assertCountEqual(qs, ["a1", "a2"]) def test_count_union(self): - qs1 = Number.objects.filter(num__lte=1).values('num') - qs2 = Number.objects.filter(num__gte=2, num__lte=3).values('num') + qs1 = Number.objects.filter(num__lte=1).values("num") + qs2 = Number.objects.filter(num__gte=2, num__lte=3).values("num") self.assertEqual(qs1.union(qs2).count(), 4) def test_count_union_empty_result(self): qs = Number.objects.filter(pk__in=[]) self.assertEqual(qs.union(qs).count(), 0) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_count_difference(self): qs1 = Number.objects.filter(num__lt=10) qs2 = Number.objects.filter(num__lt=9) self.assertEqual(qs1.difference(qs2).count(), 1) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_count_intersection(self): qs1 = Number.objects.filter(num__gte=5) qs2 = Number.objects.filter(num__lte=5) @@ -313,28 +349,28 @@ class QuerySetSetOperationTests(TestCase): self.assertIs(qs1.union(qs2).exists(), True) captured_queries = context.captured_queries self.assertEqual(len(captured_queries), 1) - captured_sql = captured_queries[0]['sql'] + captured_sql = captured_queries[0]["sql"] self.assertNotIn( connection.ops.quote_name(Number._meta.pk.column), captured_sql, ) self.assertEqual( captured_sql.count(connection.ops.limit_offset_sql(None, 1)), - 3 if connection.features.supports_slicing_ordering_in_compound else 1 + 3 if connection.features.supports_slicing_ordering_in_compound else 1, ) def test_exists_union_empty_result(self): qs = Number.objects.filter(pk__in=[]) self.assertIs(qs.union(qs).exists(), False) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_exists_intersection(self): qs1 = Number.objects.filter(num__gt=5) qs2 = Number.objects.filter(num__lt=5) self.assertIs(qs1.intersection(qs1).exists(), True) self.assertIs(qs1.intersection(qs2).exists(), False) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_exists_difference(self): qs1 = Number.objects.filter(num__gte=5) qs2 = Number.objects.filter(num__gte=3) @@ -345,75 +381,79 @@ class QuerySetSetOperationTests(TestCase): qs = Number.objects.filter(num=2) self.assertEqual(qs.union(qs).get().num, 2) - @skipUnlessDBFeature('supports_select_difference') + @skipUnlessDBFeature("supports_select_difference") def test_get_difference(self): qs1 = Number.objects.all() qs2 = Number.objects.exclude(num=2) self.assertEqual(qs1.difference(qs2).get().num, 2) - @skipUnlessDBFeature('supports_select_intersection') + @skipUnlessDBFeature("supports_select_intersection") def test_get_intersection(self): qs1 = Number.objects.all() qs2 = Number.objects.filter(num=2) self.assertEqual(qs1.intersection(qs2).get().num, 2) - @skipUnlessDBFeature('supports_slicing_ordering_in_compound') + @skipUnlessDBFeature("supports_slicing_ordering_in_compound") def test_ordering_subqueries(self): - qs1 = Number.objects.order_by('num')[:2] - qs2 = Number.objects.order_by('-num')[:2] - self.assertNumbersEqual(qs1.union(qs2).order_by('-num')[:4], [9, 8, 1, 0]) + qs1 = Number.objects.order_by("num")[:2] + qs2 = Number.objects.order_by("-num")[:2] + self.assertNumbersEqual(qs1.union(qs2).order_by("-num")[:4], [9, 8, 1, 0]) - @skipIfDBFeature('supports_slicing_ordering_in_compound') + @skipIfDBFeature("supports_slicing_ordering_in_compound") def test_unsupported_ordering_slicing_raises_db_error(self): qs1 = Number.objects.all() qs2 = Number.objects.all() qs3 = Number.objects.all() - msg = 'LIMIT/OFFSET not allowed in subqueries of compound statements' + msg = "LIMIT/OFFSET not allowed in subqueries of compound statements" with self.assertRaisesMessage(DatabaseError, msg): list(qs1.union(qs2[:10])) - msg = 'ORDER BY not allowed in subqueries of compound statements' + msg = "ORDER BY not allowed in subqueries of compound statements" with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.order_by('id').union(qs2)) + list(qs1.order_by("id").union(qs2)) with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.union(qs2).order_by('id').union(qs3)) + list(qs1.union(qs2).order_by("id").union(qs3)) - @skipIfDBFeature('supports_select_intersection') + @skipIfDBFeature("supports_select_intersection") def test_unsupported_intersection_raises_db_error(self): qs1 = Number.objects.all() qs2 = Number.objects.all() - msg = 'intersection is not supported on this database backend' + msg = "intersection is not supported on this database backend" with self.assertRaisesMessage(NotSupportedError, msg): list(qs1.intersection(qs2)) def test_combining_multiple_models(self): - ReservedName.objects.create(name='99 little bugs', order=99) - qs1 = Number.objects.filter(num=1).values_list('num', flat=True) - qs2 = ReservedName.objects.values_list('order') - self.assertEqual(list(qs1.union(qs2).order_by('num')), [1, 99]) + ReservedName.objects.create(name="99 little bugs", order=99) + qs1 = Number.objects.filter(num=1).values_list("num", flat=True) + qs2 = ReservedName.objects.values_list("order") + self.assertEqual(list(qs1.union(qs2).order_by("num")), [1, 99]) def test_order_raises_on_non_selected_column(self): - qs1 = Number.objects.filter().annotate( - annotation=Value(1, IntegerField()), - ).values('annotation', num2=F('num')) - qs2 = Number.objects.filter().values('id', 'num') + qs1 = ( + Number.objects.filter() + .annotate( + annotation=Value(1, IntegerField()), + ) + .values("annotation", num2=F("num")) + ) + qs2 = Number.objects.filter().values("id", "num") # Should not raise - list(qs1.union(qs2).order_by('annotation')) - list(qs1.union(qs2).order_by('num2')) - msg = 'ORDER BY term does not match any column in the result set' + list(qs1.union(qs2).order_by("annotation")) + list(qs1.union(qs2).order_by("num2")) + msg = "ORDER BY term does not match any column in the result set" # 'id' is not part of the select with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.union(qs2).order_by('id')) + list(qs1.union(qs2).order_by("id")) # 'num' got realiased to num2 with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.union(qs2).order_by('num')) + list(qs1.union(qs2).order_by("num")) with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.union(qs2).order_by(F('num'))) + list(qs1.union(qs2).order_by(F("num"))) with self.assertRaisesMessage(DatabaseError, msg): - list(qs1.union(qs2).order_by(F('num').desc())) + list(qs1.union(qs2).order_by(F("num").desc())) # switched order, now 'exists' again: - list(qs2.union(qs1).order_by('num')) + list(qs2.union(qs1).order_by("num")) - @skipUnlessDBFeature('supports_select_difference', 'supports_select_intersection') + @skipUnlessDBFeature("supports_select_difference", "supports_select_intersection") def test_qs_with_subcompound_qs(self): qs1 = Number.objects.all() qs2 = Number.objects.intersection(Number.objects.filter(num__gt=1)) @@ -423,31 +463,31 @@ class QuerySetSetOperationTests(TestCase): qs = Number.objects.all() union = qs.union(qs) numbers = list(range(10)) - self.assertNumbersEqual(union.order_by('num'), numbers) - self.assertNumbersEqual(union.order_by('other_num'), reversed(numbers)) + self.assertNumbersEqual(union.order_by("num"), numbers) + self.assertNumbersEqual(union.order_by("other_num"), reversed(numbers)) def test_unsupported_operations_on_combined_qs(self): qs = Number.objects.all() - msg = 'Calling QuerySet.%s() after %s() is not supported.' - combinators = ['union'] + msg = "Calling QuerySet.%s() after %s() is not supported." + combinators = ["union"] if connection.features.supports_select_difference: - combinators.append('difference') + combinators.append("difference") if connection.features.supports_select_intersection: - combinators.append('intersection') + combinators.append("intersection") for combinator in combinators: for operation in ( - 'alias', - 'annotate', - 'defer', - 'delete', - 'distinct', - 'exclude', - 'extra', - 'filter', - 'only', - 'prefetch_related', - 'select_related', - 'update', + "alias", + "annotate", + "defer", + "delete", + "distinct", + "exclude", + "extra", + "filter", + "only", + "prefetch_related", + "select_related", + "update", ): with self.subTest(combinator=combinator, operation=operation): with self.assertRaisesMessage( @@ -457,19 +497,19 @@ class QuerySetSetOperationTests(TestCase): getattr(getattr(qs, combinator)(qs), operation)() with self.assertRaisesMessage( NotSupportedError, - msg % ('contains', combinator), + msg % ("contains", combinator), ): obj = Number.objects.first() getattr(qs, combinator)(qs).contains(obj) def test_get_with_filters_unsupported_on_combined_qs(self): qs = Number.objects.all() - msg = 'Calling QuerySet.get(...) with filters after %s() is not supported.' - combinators = ['union'] + msg = "Calling QuerySet.get(...) with filters after %s() is not supported." + combinators = ["union"] if connection.features.supports_select_difference: - combinators.append('difference') + combinators.append("difference") if connection.features.supports_select_intersection: - combinators.append('intersection') + combinators.append("intersection") for combinator in combinators: with self.subTest(combinator=combinator): with self.assertRaisesMessage(NotSupportedError, msg % combinator): @@ -477,15 +517,15 @@ class QuerySetSetOperationTests(TestCase): def test_operator_on_combined_qs_error(self): qs = Number.objects.all() - msg = 'Cannot use %s operator with combined queryset.' - combinators = ['union'] + msg = "Cannot use %s operator with combined queryset." + combinators = ["union"] if connection.features.supports_select_difference: - combinators.append('difference') + combinators.append("difference") if connection.features.supports_select_intersection: - combinators.append('intersection') + combinators.append("intersection") operators = [ - ('|', operator.or_), - ('&', operator.and_), + ("|", operator.or_), + ("&", operator.and_), ] for combinator in combinators: combined_qs = getattr(qs, combinator)(qs) diff --git a/tests/queries/test_query.py b/tests/queries/test_query.py index 6ed766e3ae..e69d569ac3 100644 --- a/tests/queries/test_query.py +++ b/tests/queries/test_query.py @@ -21,11 +21,11 @@ class TestQuery(SimpleTestCase): lookup = where.children[0] self.assertIsInstance(lookup, GreaterThan) self.assertEqual(lookup.rhs, 2) - self.assertEqual(lookup.lhs.target, Author._meta.get_field('num')) + self.assertEqual(lookup.lhs.target, Author._meta.get_field("num")) def test_non_alias_cols_query(self): query = Query(Author, alias_cols=False) - where = query.build_where(Q(num__gt=2, name__isnull=False) | Q(num__lt=F('id'))) + where = query.build_where(Q(num__gt=2, name__isnull=False) | Q(num__lt=F("id"))) name_isnull_lookup, num_gt_lookup = where.children[0].children self.assertIsInstance(num_gt_lookup, GreaterThan) @@ -50,35 +50,35 @@ class TestQuery(SimpleTestCase): lookup = where.children[0] self.assertIsInstance(lookup, GreaterThan) self.assertEqual(lookup.rhs, 2) - self.assertEqual(lookup.lhs.target, Author._meta.get_field('num')) + self.assertEqual(lookup.lhs.target, Author._meta.get_field("num")) lookup = where.children[1] self.assertIsInstance(lookup, LessThan) self.assertEqual(lookup.rhs, 0) - self.assertEqual(lookup.lhs.target, Author._meta.get_field('num')) + self.assertEqual(lookup.lhs.target, Author._meta.get_field("num")) def test_multiple_fields(self): query = Query(Item, alias_cols=False) - where = query.build_where(Q(modified__gt=F('created'))) + where = query.build_where(Q(modified__gt=F("created"))) lookup = where.children[0] self.assertIsInstance(lookup, GreaterThan) self.assertIsInstance(lookup.rhs, Col) self.assertIsNone(lookup.rhs.alias) self.assertIsInstance(lookup.lhs, Col) self.assertIsNone(lookup.lhs.alias) - self.assertEqual(lookup.rhs.target, Item._meta.get_field('created')) - self.assertEqual(lookup.lhs.target, Item._meta.get_field('modified')) + self.assertEqual(lookup.rhs.target, Item._meta.get_field("created")) + self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified")) def test_transform(self): query = Query(Author, alias_cols=False) with register_lookup(CharField, Lower): - where = query.build_where(~Q(name__lower='foo')) + where = query.build_where(~Q(name__lower="foo")) lookup = where.children[0] self.assertIsInstance(lookup, Exact) self.assertIsInstance(lookup.lhs, Lower) self.assertIsInstance(lookup.lhs.lhs, Col) self.assertIsNone(lookup.lhs.lhs.alias) - self.assertEqual(lookup.lhs.lhs.target, Author._meta.get_field('name')) + self.assertEqual(lookup.lhs.lhs.target, Author._meta.get_field("name")) def test_negated_nullable(self): query = Query(Item) @@ -86,21 +86,21 @@ class TestQuery(SimpleTestCase): self.assertTrue(where.negated) lookup = where.children[0] self.assertIsInstance(lookup, LessThan) - self.assertEqual(lookup.lhs.target, Item._meta.get_field('modified')) + self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified")) lookup = where.children[1] self.assertIsInstance(lookup, IsNull) - self.assertEqual(lookup.lhs.target, Item._meta.get_field('modified')) + self.assertEqual(lookup.lhs.target, Item._meta.get_field("modified")) def test_foreign_key(self): query = Query(Item) - msg = 'Joined field references are not permitted in this query' + msg = "Joined field references are not permitted in this query" with self.assertRaisesMessage(FieldError, msg): query.build_where(Q(creator__num__gt=2)) def test_foreign_key_f(self): query = Query(Ranking) with self.assertRaises(FieldError): - query.build_where(Q(rank__gt=F('author__num'))) + query.build_where(Q(rank__gt=F("author__num"))) def test_foreign_key_exclusive(self): query = Query(ObjectC, alias_cols=False) @@ -109,23 +109,23 @@ class TestQuery(SimpleTestCase): self.assertIsInstance(a_isnull, RelatedIsNull) self.assertIsInstance(a_isnull.lhs, Col) self.assertIsNone(a_isnull.lhs.alias) - self.assertEqual(a_isnull.lhs.target, ObjectC._meta.get_field('objecta')) + self.assertEqual(a_isnull.lhs.target, ObjectC._meta.get_field("objecta")) b_isnull = where.children[1] self.assertIsInstance(b_isnull, RelatedIsNull) self.assertIsInstance(b_isnull.lhs, Col) self.assertIsNone(b_isnull.lhs.alias) - self.assertEqual(b_isnull.lhs.target, ObjectC._meta.get_field('objectb')) + self.assertEqual(b_isnull.lhs.target, ObjectC._meta.get_field("objectb")) def test_clone_select_related(self): query = Query(Item) - query.add_select_related(['creator']) + query.add_select_related(["creator"]) clone = query.clone() - clone.add_select_related(['note', 'creator__extra']) - self.assertEqual(query.select_related, {'creator': {}}) + clone.add_select_related(["note", "creator__extra"]) + self.assertEqual(query.select_related, {"creator": {}}) def test_iterable_lookup_value(self): query = Query(Item) - where = query.build_where(Q(name=['a', 'b'])) + where = query.build_where(Q(name=["a", "b"])) name_exact = where.children[0] self.assertIsInstance(name_exact, Exact) self.assertEqual(name_exact.rhs, "['a', 'b']") @@ -140,14 +140,14 @@ class TestQuery(SimpleTestCase): def test_filter_conditional_join(self): query = Query(Item) - filter_expr = Func('note__note', output_field=BooleanField()) - msg = 'Joined field references are not permitted in this query' + filter_expr = Func("note__note", output_field=BooleanField()) + msg = "Joined field references are not permitted in this query" with self.assertRaisesMessage(FieldError, msg): query.build_where(filter_expr) def test_filter_non_conditional(self): query = Query(Item) - msg = 'Cannot filter against a non-conditional expression.' + msg = "Cannot filter against a non-conditional expression." with self.assertRaisesMessage(TypeError, msg): query.build_where(Func(output_field=CharField())) @@ -155,6 +155,6 @@ class TestQuery(SimpleTestCase): class JoinPromoterTest(SimpleTestCase): def test_repr(self): self.assertEqual( - repr(JoinPromoter('AND', 3, True)), + repr(JoinPromoter("AND", 3, True)), "JoinPromoter(connector='AND', num_children=3, negated=True)", ) diff --git a/tests/queries/test_sqlcompiler.py b/tests/queries/test_sqlcompiler.py index 3116429c05..72a66ad07c 100644 --- a/tests/queries/test_sqlcompiler.py +++ b/tests/queries/test_sqlcompiler.py @@ -13,5 +13,5 @@ class SQLCompilerTest(SimpleTestCase): repr(compiler), f"<SQLCompiler model=Item connection=" f"<DatabaseWrapper vendor={connection.vendor!r} alias='default'> " - f"using='default'>" + f"using='default'>", ) diff --git a/tests/queries/tests.py b/tests/queries/tests.py index 09aaad95a9..e1fd8b24b6 100644 --- a/tests/queries/tests.py +++ b/tests/queries/tests.py @@ -15,21 +15,99 @@ from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature from django.test.utils import CaptureQueriesContext from .models import ( - FK1, Annotation, Article, Author, BaseA, BaseUser, Book, CategoryItem, - CategoryRelationship, Celebrity, Channel, Chapter, Child, ChildObjectA, - Classroom, CommonMixedCaseForeignKeys, Company, Cover, CustomPk, - CustomPkTag, DateTimePK, Detail, DumbCategory, Eaten, Employment, - ExtraInfo, Fan, Food, Identifier, Individual, Item, Job, - JobResponsibilities, Join, LeafA, LeafB, LoopX, LoopZ, ManagedModel, - Member, MixedCaseDbColumnCategoryItem, MixedCaseFieldCategoryItem, ModelA, - ModelB, ModelC, ModelD, MyObject, NamedCategory, Node, Note, NullableName, - Number, ObjectA, ObjectB, ObjectC, OneToOneCategory, Order, OrderItem, - Page, Paragraph, Person, Plaything, PointerA, Program, ProxyCategory, - ProxyObjectA, ProxyObjectB, Ranking, Related, RelatedIndividual, - RelatedObject, Report, ReportComment, ReservedName, Responsibility, School, - SharedConnection, SimpleCategory, SingleObject, SpecialCategory, Staff, - StaffUser, Student, Tag, Task, Teacher, Ticket21203Child, - Ticket21203Parent, Ticket23605A, Ticket23605B, Ticket23605C, TvChef, Valid, + FK1, + Annotation, + Article, + Author, + BaseA, + BaseUser, + Book, + CategoryItem, + CategoryRelationship, + Celebrity, + Channel, + Chapter, + Child, + ChildObjectA, + Classroom, + CommonMixedCaseForeignKeys, + Company, + Cover, + CustomPk, + CustomPkTag, + DateTimePK, + Detail, + DumbCategory, + Eaten, + Employment, + ExtraInfo, + Fan, + Food, + Identifier, + Individual, + Item, + Job, + JobResponsibilities, + Join, + LeafA, + LeafB, + LoopX, + LoopZ, + ManagedModel, + Member, + MixedCaseDbColumnCategoryItem, + MixedCaseFieldCategoryItem, + ModelA, + ModelB, + ModelC, + ModelD, + MyObject, + NamedCategory, + Node, + Note, + NullableName, + Number, + ObjectA, + ObjectB, + ObjectC, + OneToOneCategory, + Order, + OrderItem, + Page, + Paragraph, + Person, + Plaything, + PointerA, + Program, + ProxyCategory, + ProxyObjectA, + ProxyObjectB, + Ranking, + Related, + RelatedIndividual, + RelatedObject, + Report, + ReportComment, + ReservedName, + Responsibility, + School, + SharedConnection, + SimpleCategory, + SingleObject, + SpecialCategory, + Staff, + StaffUser, + Student, + Tag, + Task, + Teacher, + Ticket21203Child, + Ticket21203Parent, + Ticket23605A, + Ticket23605B, + Ticket23605C, + TvChef, + Valid, X, ) @@ -38,46 +116,60 @@ class Queries1Tests(TestCase): @classmethod def setUpTestData(cls): cls.nc1 = generic = NamedCategory.objects.create(name="Generic") - cls.t1 = Tag.objects.create(name='t1', category=generic) - cls.t2 = Tag.objects.create(name='t2', parent=cls.t1, category=generic) - cls.t3 = Tag.objects.create(name='t3', parent=cls.t1) - cls.t4 = Tag.objects.create(name='t4', parent=cls.t3) - cls.t5 = Tag.objects.create(name='t5', parent=cls.t3) + cls.t1 = Tag.objects.create(name="t1", category=generic) + cls.t2 = Tag.objects.create(name="t2", parent=cls.t1, category=generic) + cls.t3 = Tag.objects.create(name="t3", parent=cls.t1) + cls.t4 = Tag.objects.create(name="t4", parent=cls.t3) + cls.t5 = Tag.objects.create(name="t5", parent=cls.t3) - cls.n1 = Note.objects.create(note='n1', misc='foo', id=1) - cls.n2 = Note.objects.create(note='n2', misc='bar', id=2) - cls.n3 = Note.objects.create(note='n3', misc='foo', id=3, negate=False) + cls.n1 = Note.objects.create(note="n1", misc="foo", id=1) + cls.n2 = Note.objects.create(note="n2", misc="bar", id=2) + cls.n3 = Note.objects.create(note="n3", misc="foo", id=3, negate=False) - cls.ann1 = Annotation.objects.create(name='a1', tag=cls.t1) + cls.ann1 = Annotation.objects.create(name="a1", tag=cls.t1) cls.ann1.notes.add(cls.n1) - ann2 = Annotation.objects.create(name='a2', tag=cls.t4) + ann2 = Annotation.objects.create(name="a2", tag=cls.t4) ann2.notes.add(cls.n2, cls.n3) # Create these out of order so that sorting by 'id' will be different to sorting # by 'info'. Helps detect some problems later. - cls.e2 = ExtraInfo.objects.create(info='e2', note=cls.n2, value=41, filterable=False) - e1 = ExtraInfo.objects.create(info='e1', note=cls.n1, value=42) + cls.e2 = ExtraInfo.objects.create( + info="e2", note=cls.n2, value=41, filterable=False + ) + e1 = ExtraInfo.objects.create(info="e1", note=cls.n1, value=42) - cls.a1 = Author.objects.create(name='a1', num=1001, extra=e1) - cls.a2 = Author.objects.create(name='a2', num=2002, extra=e1) - cls.a3 = Author.objects.create(name='a3', num=3003, extra=cls.e2) - cls.a4 = Author.objects.create(name='a4', num=4004, extra=cls.e2) + cls.a1 = Author.objects.create(name="a1", num=1001, extra=e1) + cls.a2 = Author.objects.create(name="a2", num=2002, extra=e1) + cls.a3 = Author.objects.create(name="a3", num=3003, extra=cls.e2) + cls.a4 = Author.objects.create(name="a4", num=4004, extra=cls.e2) cls.time1 = datetime.datetime(2007, 12, 19, 22, 25, 0) cls.time2 = datetime.datetime(2007, 12, 19, 21, 0, 0) time3 = datetime.datetime(2007, 12, 20, 22, 25, 0) time4 = datetime.datetime(2007, 12, 20, 21, 0, 0) - cls.i1 = Item.objects.create(name='one', created=cls.time1, modified=cls.time1, creator=cls.a1, note=cls.n3) + cls.i1 = Item.objects.create( + name="one", + created=cls.time1, + modified=cls.time1, + creator=cls.a1, + note=cls.n3, + ) cls.i1.tags.set([cls.t1, cls.t2]) - cls.i2 = Item.objects.create(name='two', created=cls.time2, creator=cls.a2, note=cls.n2) + cls.i2 = Item.objects.create( + name="two", created=cls.time2, creator=cls.a2, note=cls.n2 + ) cls.i2.tags.set([cls.t1, cls.t3]) - cls.i3 = Item.objects.create(name='three', created=time3, creator=cls.a2, note=cls.n3) - cls.i4 = Item.objects.create(name='four', created=time4, creator=cls.a4, note=cls.n3) + cls.i3 = Item.objects.create( + name="three", created=time3, creator=cls.a2, note=cls.n3 + ) + cls.i4 = Item.objects.create( + name="four", created=time4, creator=cls.a4, note=cls.n3 + ) cls.i4.tags.set([cls.t4]) - cls.r1 = Report.objects.create(name='r1', creator=cls.a1) - cls.r2 = Report.objects.create(name='r2', creator=cls.a3) - cls.r3 = Report.objects.create(name='r3') + cls.r1 = Report.objects.create(name="r1", creator=cls.a1) + cls.r2 = Report.objects.create(name="r2", creator=cls.a3) + cls.r3 = Report.objects.create(name="r3") # Ordering by 'rank' gives us rank2, rank1, rank3. Ordering by the Meta.ordering # will be rank3, rank2, rank1. @@ -90,14 +182,14 @@ class Queries1Tests(TestCase): qs1 = Tag.objects.filter(pk__lte=0) qs2 = Tag.objects.filter(parent__in=qs1) qs3 = Tag.objects.filter(parent__in=qs2) - self.assertEqual(qs3.query.subq_aliases, {'T', 'U', 'V'}) - self.assertIn('v0', str(qs3.query).lower()) + self.assertEqual(qs3.query.subq_aliases, {"T", "U", "V"}) + self.assertIn("v0", str(qs3.query).lower()) qs4 = qs3.filter(parent__in=qs1) - self.assertEqual(qs4.query.subq_aliases, {'T', 'U', 'V'}) + self.assertEqual(qs4.query.subq_aliases, {"T", "U", "V"}) # It is possible to reuse U for the second subquery, no need to use W. - self.assertNotIn('w0', str(qs4.query).lower()) + self.assertNotIn("w0", str(qs4.query).lower()) # So, 'U0."id"' is referenced in SELECT and WHERE twice. - self.assertEqual(str(qs4.query).lower().count('u0.'), 4) + self.assertEqual(str(qs4.query).lower().count("u0."), 4) def test_ticket1050(self): self.assertSequenceEqual( @@ -129,7 +221,7 @@ class Queries1Tests(TestCase): self.assertNotIn(LOUTER, [x.join_type for x in query.alias_map.values()]) self.assertSequenceEqual( - Item.objects.filter(Q(tags=self.t1)).order_by('name'), + Item.objects.filter(Q(tags=self.t1)).order_by("name"), [self.i1, self.i2], ) self.assertSequenceEqual( @@ -137,7 +229,9 @@ class Queries1Tests(TestCase): [self.i1], ) self.assertSequenceEqual( - Item.objects.filter(Q(tags=self.t1)).filter(Q(creator__name='fred') | Q(tags=self.t2)), + Item.objects.filter(Q(tags=self.t1)).filter( + Q(creator__name="fred") | Q(tags=self.t2) + ), [self.i1], ) @@ -145,12 +239,13 @@ class Queries1Tests(TestCase): # different from the previous example as it tries to find tags that are two # things at once (rather than two tags). self.assertSequenceEqual( - Item.objects.filter(Q(tags=self.t1) & Q(tags=self.t2)), - [] + Item.objects.filter(Q(tags=self.t1) & Q(tags=self.t2)), [] ) self.assertSequenceEqual( - Item.objects.filter(Q(tags=self.t1), Q(creator__name='fred') | Q(tags=self.t2)), - [] + Item.objects.filter( + Q(tags=self.t1), Q(creator__name="fred") | Q(tags=self.t2) + ), + [], ) qs = Author.objects.filter(ranking__rank=2, ranking__id=self.rank1.id) @@ -165,7 +260,9 @@ class Queries1Tests(TestCase): [self.i1], ) self.assertSequenceEqual( - Item.objects.filter(tags__in=[self.t1, self.t2]).distinct().order_by('name'), + Item.objects.filter(tags__in=[self.t1, self.t2]) + .distinct() + .order_by("name"), [self.i1, self.i2], ) self.assertSequenceEqual( @@ -175,29 +272,31 @@ class Queries1Tests(TestCase): # Make sure .distinct() works with slicing (this was broken in Oracle). self.assertSequenceEqual( - Item.objects.filter(tags__in=[self.t1, self.t2]).order_by('name')[:3], + Item.objects.filter(tags__in=[self.t1, self.t2]).order_by("name")[:3], [self.i1, self.i1, self.i2], ) self.assertSequenceEqual( - Item.objects.filter(tags__in=[self.t1, self.t2]).distinct().order_by('name')[:3], + Item.objects.filter(tags__in=[self.t1, self.t2]) + .distinct() + .order_by("name")[:3], [self.i1, self.i2], ) def test_tickets_2080_3592(self): self.assertSequenceEqual( - Author.objects.filter(item__name='one') | Author.objects.filter(name='a3'), + Author.objects.filter(item__name="one") | Author.objects.filter(name="a3"), [self.a1, self.a3], ) self.assertSequenceEqual( - Author.objects.filter(Q(item__name='one') | Q(name='a3')), + Author.objects.filter(Q(item__name="one") | Q(name="a3")), [self.a1, self.a3], ) self.assertSequenceEqual( - Author.objects.filter(Q(name='a3') | Q(item__name='one')), + Author.objects.filter(Q(name="a3") | Q(item__name="one")), [self.a1, self.a3], ) self.assertSequenceEqual( - Author.objects.filter(Q(item__name='three') | Q(report__name='r3')), + Author.objects.filter(Q(item__name="three") | Q(report__name="r3")), [self.a2], ) @@ -205,114 +304,112 @@ class Queries1Tests(TestCase): # Merging two empty result sets shouldn't leave a queryset with no constraints # (which would match everything). self.assertSequenceEqual(Author.objects.filter(Q(id__in=[])), []) - self.assertSequenceEqual( - Author.objects.filter(Q(id__in=[]) | Q(id__in=[])), - [] - ) + self.assertSequenceEqual(Author.objects.filter(Q(id__in=[]) | Q(id__in=[])), []) def test_tickets_1878_2939(self): - self.assertEqual(Item.objects.values('creator').distinct().count(), 3) + self.assertEqual(Item.objects.values("creator").distinct().count(), 3) # Create something with a duplicate 'name' so that we can test multi-column # cases (which require some tricky SQL transformations under the covers). - xx = Item(name='four', created=self.time1, creator=self.a2, note=self.n1) + xx = Item(name="four", created=self.time1, creator=self.a2, note=self.n1) xx.save() self.assertEqual( - Item.objects.exclude(name='two').values('creator', 'name').distinct().count(), - 4 + Item.objects.exclude(name="two") + .values("creator", "name") + .distinct() + .count(), + 4, ) self.assertEqual( ( - Item.objects - .exclude(name='two') - .extra(select={'foo': '%s'}, select_params=(1,)) - .values('creator', 'name', 'foo') + Item.objects.exclude(name="two") + .extra(select={"foo": "%s"}, select_params=(1,)) + .values("creator", "name", "foo") .distinct() .count() ), - 4 + 4, ) self.assertEqual( ( - Item.objects - .exclude(name='two') - .extra(select={'foo': '%s'}, select_params=(1,)) - .values('creator', 'name') + Item.objects.exclude(name="two") + .extra(select={"foo": "%s"}, select_params=(1,)) + .values("creator", "name") .distinct() .count() ), - 4 + 4, ) xx.delete() def test_ticket7323(self): - self.assertEqual(Item.objects.values('creator', 'name').count(), 4) + self.assertEqual(Item.objects.values("creator", "name").count(), 4) def test_ticket2253(self): - q1 = Item.objects.order_by('name') + q1 = Item.objects.order_by("name") q2 = Item.objects.filter(id=self.i1.id) self.assertSequenceEqual(q1, [self.i4, self.i1, self.i3, self.i2]) self.assertSequenceEqual(q2, [self.i1]) self.assertSequenceEqual( - (q1 | q2).order_by('name'), + (q1 | q2).order_by("name"), [self.i4, self.i1, self.i3, self.i2], ) - self.assertSequenceEqual((q1 & q2).order_by('name'), [self.i1]) + self.assertSequenceEqual((q1 & q2).order_by("name"), [self.i1]) q1 = Item.objects.filter(tags=self.t1) q2 = Item.objects.filter(note=self.n3, tags=self.t2) q3 = Item.objects.filter(creator=self.a4) self.assertSequenceEqual( - ((q1 & q2) | q3).order_by('name'), + ((q1 & q2) | q3).order_by("name"), [self.i4, self.i1], ) def test_order_by_tables(self): - q1 = Item.objects.order_by('name') + q1 = Item.objects.order_by("name") q2 = Item.objects.filter(id=self.i1.id) list(q2) - combined_query = (q1 & q2).order_by('name').query - self.assertEqual(len([ - t for t in combined_query.alias_map if combined_query.alias_refcount[t] - ]), 1) + combined_query = (q1 & q2).order_by("name").query + self.assertEqual( + len( + [ + t + for t in combined_query.alias_map + if combined_query.alias_refcount[t] + ] + ), + 1, + ) def test_order_by_join_unref(self): """ This test is related to the above one, testing that there aren't old JOINs in the query. """ - qs = Celebrity.objects.order_by('greatest_fan__fan_of') - self.assertIn('OUTER JOIN', str(qs.query)) - qs = qs.order_by('id') - self.assertNotIn('OUTER JOIN', str(qs.query)) + qs = Celebrity.objects.order_by("greatest_fan__fan_of") + self.assertIn("OUTER JOIN", str(qs.query)) + qs = qs.order_by("id") + self.assertNotIn("OUTER JOIN", str(qs.query)) def test_get_clears_ordering(self): """ get() should clear ordering for optimization purposes. """ with CaptureQueriesContext(connection) as captured_queries: - Author.objects.order_by('name').get(pk=self.a1.pk) - self.assertNotIn('order by', captured_queries[0]['sql'].lower()) + Author.objects.order_by("name").get(pk=self.a1.pk) + self.assertNotIn("order by", captured_queries[0]["sql"].lower()) def test_tickets_4088_4306(self): self.assertSequenceEqual(Report.objects.filter(creator=1001), [self.r1]) - self.assertSequenceEqual( - Report.objects.filter(creator__num=1001), - [self.r1] - ) + self.assertSequenceEqual(Report.objects.filter(creator__num=1001), [self.r1]) self.assertSequenceEqual(Report.objects.filter(creator__id=1001), []) self.assertSequenceEqual( - Report.objects.filter(creator__id=self.a1.id), - [self.r1] - ) - self.assertSequenceEqual( - Report.objects.filter(creator__name='a1'), - [self.r1] + Report.objects.filter(creator__id=self.a1.id), [self.r1] ) + self.assertSequenceEqual(Report.objects.filter(creator__name="a1"), [self.r1]) def test_ticket4510(self): self.assertSequenceEqual( - Author.objects.filter(report__name='r1'), + Author.objects.filter(report__name="r1"), [self.a1], ) @@ -321,30 +418,30 @@ class Queries1Tests(TestCase): def test_tickets_5324_6704(self): self.assertSequenceEqual( - Item.objects.filter(tags__name='t4'), + Item.objects.filter(tags__name="t4"), [self.i4], ) self.assertSequenceEqual( - Item.objects.exclude(tags__name='t4').order_by('name').distinct(), + Item.objects.exclude(tags__name="t4").order_by("name").distinct(), [self.i1, self.i3, self.i2], ) self.assertSequenceEqual( - Item.objects.exclude(tags__name='t4').order_by('name').distinct().reverse(), + Item.objects.exclude(tags__name="t4").order_by("name").distinct().reverse(), [self.i2, self.i3, self.i1], ) self.assertSequenceEqual( - Author.objects.exclude(item__name='one').distinct().order_by('name'), + Author.objects.exclude(item__name="one").distinct().order_by("name"), [self.a2, self.a3, self.a4], ) # Excluding across a m2m relation when there is more than one related # object associated was problematic. self.assertSequenceEqual( - Item.objects.exclude(tags__name='t1').order_by('name'), + Item.objects.exclude(tags__name="t1").order_by("name"), [self.i4, self.i3], ) self.assertSequenceEqual( - Item.objects.exclude(tags__name='t1').exclude(tags__name='t4'), + Item.objects.exclude(tags__name="t1").exclude(tags__name="t4"), [self.i3], ) @@ -356,49 +453,57 @@ class Queries1Tests(TestCase): # values (Author -> ExtraInfo, in the following), it should never be # promoted to a left outer join. So the following query should only # involve one "left outer" join (Author -> Item is 0-to-many). - qs = Author.objects.filter(id=self.a1.id).filter(Q(extra__note=self.n1) | Q(item__note=self.n3)) + qs = Author.objects.filter(id=self.a1.id).filter( + Q(extra__note=self.n1) | Q(item__note=self.n3) + ) self.assertEqual( - len([ - x for x in qs.query.alias_map.values() - if x.join_type == LOUTER and qs.query.alias_refcount[x.table_alias] - ]), - 1 + len( + [ + x + for x in qs.query.alias_map.values() + if x.join_type == LOUTER and qs.query.alias_refcount[x.table_alias] + ] + ), + 1, ) # The previous changes shouldn't affect nullable foreign key joins. self.assertSequenceEqual( - Tag.objects.filter(parent__isnull=True).order_by('name'), - [self.t1] + Tag.objects.filter(parent__isnull=True).order_by("name"), [self.t1] ) self.assertSequenceEqual( - Tag.objects.exclude(parent__isnull=True).order_by('name'), + Tag.objects.exclude(parent__isnull=True).order_by("name"), [self.t2, self.t3, self.t4, self.t5], ) self.assertSequenceEqual( - Tag.objects.exclude(Q(parent__name='t1') | Q(parent__isnull=True)).order_by('name'), + Tag.objects.exclude(Q(parent__name="t1") | Q(parent__isnull=True)).order_by( + "name" + ), [self.t4, self.t5], ) self.assertSequenceEqual( - Tag.objects.exclude(Q(parent__isnull=True) | Q(parent__name='t1')).order_by('name'), + Tag.objects.exclude(Q(parent__isnull=True) | Q(parent__name="t1")).order_by( + "name" + ), [self.t4, self.t5], ) self.assertSequenceEqual( - Tag.objects.exclude(Q(parent__parent__isnull=True)).order_by('name'), + Tag.objects.exclude(Q(parent__parent__isnull=True)).order_by("name"), [self.t4, self.t5], ) self.assertSequenceEqual( - Tag.objects.filter(~Q(parent__parent__isnull=True)).order_by('name'), + Tag.objects.filter(~Q(parent__parent__isnull=True)).order_by("name"), [self.t4, self.t5], ) def test_ticket2091(self): - t = Tag.objects.get(name='t4') + t = Tag.objects.get(name="t4") self.assertSequenceEqual(Item.objects.filter(tags__in=[t]), [self.i4]) def test_avoid_infinite_loop_on_too_many_subqueries(self): x = Tag.objects.filter(pk=1) local_recursion_limit = sys.getrecursionlimit() // 16 - msg = 'Maximum recursion depth exceeded: too many subqueries.' + msg = "Maximum recursion depth exceeded: too many subqueries." with self.assertRaisesMessage(RecursionError, msg): for i in range(local_recursion_limit + 2): x = Tag.objects.filter(pk__in=x) @@ -408,26 +513,45 @@ class Queries1Tests(TestCase): for _ in range(20): x = Tag.objects.filter(pk__in=x) self.assertEqual( - x.query.subq_aliases, { - 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'AA', 'AB', 'AC', 'AD', - 'AE', 'AF', 'AG', 'AH', 'AI', 'AJ', 'AK', 'AL', 'AM', 'AN', - } + x.query.subq_aliases, + { + "T", + "U", + "V", + "W", + "X", + "Y", + "Z", + "AA", + "AB", + "AC", + "AD", + "AE", + "AF", + "AG", + "AH", + "AI", + "AJ", + "AK", + "AL", + "AM", + "AN", + }, ) def test_heterogeneous_qs_combination(self): # Combining querysets built on different models should behave in a well-defined # fashion. We raise an error. - msg = 'Cannot combine queries on two different base models.' + msg = "Cannot combine queries on two different base models." with self.assertRaisesMessage(TypeError, msg): Author.objects.all() & Tag.objects.all() with self.assertRaisesMessage(TypeError, msg): Author.objects.all() | Tag.objects.all() def test_ticket3141(self): - self.assertEqual(Author.objects.extra(select={'foo': '1'}).count(), 4) + self.assertEqual(Author.objects.extra(select={"foo": "1"}).count(), 4) self.assertEqual( - Author.objects.extra(select={'foo': '%s'}, select_params=(1,)).count(), - 4 + Author.objects.extra(select={"foo": "%s"}, select_params=(1,)).count(), 4 ) def test_ticket2400(self): @@ -442,26 +566,28 @@ class Queries1Tests(TestCase): def test_ticket2496(self): self.assertSequenceEqual( - Item.objects.extra(tables=['queries_author']).select_related().order_by('name')[:1], + Item.objects.extra(tables=["queries_author"]) + .select_related() + .order_by("name")[:1], [self.i4], ) def test_error_raised_on_filter_with_dictionary(self): - with self.assertRaisesMessage(FieldError, 'Cannot parse keyword query as dict'): - Note.objects.filter({'note': 'n1', 'misc': 'foo'}) + with self.assertRaisesMessage(FieldError, "Cannot parse keyword query as dict"): + Note.objects.filter({"note": "n1", "misc": "foo"}) def test_tickets_2076_7256(self): # Ordering on related tables should be possible, even if the table is # not otherwise involved. self.assertSequenceEqual( - Item.objects.order_by('note__note', 'name'), + Item.objects.order_by("note__note", "name"), [self.i2, self.i4, self.i1, self.i3], ) # Ordering on a related field should use the remote model's default # ordering as a final step. self.assertSequenceEqual( - Author.objects.order_by('extra', '-name'), + Author.objects.order_by("extra", "-name"), [self.a2, self.a1, self.a4, self.a3], ) @@ -472,7 +598,7 @@ class Queries1Tests(TestCase): # If the remote model does not have a default ordering, we order by its 'id' # field. self.assertSequenceEqual( - Item.objects.order_by('creator', 'name'), + Item.objects.order_by("creator", "name"), [self.i1, self.i3, self.i2, self.i4], ) @@ -480,29 +606,31 @@ class Queries1Tests(TestCase): # ForeignKey) is legal, but the results might not make sense. That # isn't Django's problem. Garbage in, garbage out. self.assertSequenceEqual( - Item.objects.filter(tags__isnull=False).order_by('tags', 'id'), + Item.objects.filter(tags__isnull=False).order_by("tags", "id"), [self.i1, self.i2, self.i1, self.i2, self.i4], ) # If we replace the default ordering, Django adjusts the required # tables automatically. Item normally requires a join with Note to do # the default ordering, but that isn't needed here. - qs = Item.objects.order_by('name') + qs = Item.objects.order_by("name") self.assertSequenceEqual(qs, [self.i4, self.i1, self.i3, self.i2]) self.assertEqual(len(qs.query.alias_map), 1) def test_tickets_2874_3002(self): - qs = Item.objects.select_related().order_by('note__note', 'name') + qs = Item.objects.select_related().order_by("note__note", "name") self.assertQuerysetEqual(qs, [self.i2, self.i4, self.i1, self.i3]) # This is also a good select_related() test because there are multiple # Note entries in the SQL. The two Note items should be different. - self.assertEqual(repr(qs[0].note), '<Note: n2>') - self.assertEqual(repr(qs[0].creator.extra.note), '<Note: n1>') + self.assertEqual(repr(qs[0].note), "<Note: n2>") + self.assertEqual(repr(qs[0].creator.extra.note), "<Note: n1>") def test_ticket3037(self): self.assertSequenceEqual( - Item.objects.filter(Q(creator__name='a3', name='two') | Q(creator__name='a4', name='four')), + Item.objects.filter( + Q(creator__name="a3", name="two") | Q(creator__name="a4", name="four") + ), [self.i4], ) @@ -513,8 +641,8 @@ class Queries1Tests(TestCase): # isn't a bug; it's a warning to be careful with the selection of # ordering columns. self.assertSequenceEqual( - Note.objects.values('misc').distinct().order_by('note', '-misc'), - [{'misc': 'foo'}, {'misc': 'bar'}, {'misc': 'foo'}] + Note.objects.values("misc").distinct().order_by("note", "-misc"), + [{"misc": "foo"}, {"misc": "bar"}, {"misc": "foo"}], ) def test_ticket4358(self): @@ -523,121 +651,146 @@ class Queries1Tests(TestCase): # able to pass "foo_id" in the fields list and have it work, too. We # actually allow both "foo" and "foo_id". # The *_id version is returned by default. - self.assertIn('note_id', ExtraInfo.objects.values()[0]) + self.assertIn("note_id", ExtraInfo.objects.values()[0]) # You can also pass it in explicitly. - self.assertSequenceEqual(ExtraInfo.objects.values('note_id'), [{'note_id': 1}, {'note_id': 2}]) + self.assertSequenceEqual( + ExtraInfo.objects.values("note_id"), [{"note_id": 1}, {"note_id": 2}] + ) # ...or use the field name. - self.assertSequenceEqual(ExtraInfo.objects.values('note'), [{'note': 1}, {'note': 2}]) + self.assertSequenceEqual( + ExtraInfo.objects.values("note"), [{"note": 1}, {"note": 2}] + ) def test_ticket6154(self): # Multiple filter statements are joined using "AND" all the time. self.assertSequenceEqual( - Author.objects.filter(id=self.a1.id).filter(Q(extra__note=self.n1) | Q(item__note=self.n3)), + Author.objects.filter(id=self.a1.id).filter( + Q(extra__note=self.n1) | Q(item__note=self.n3) + ), [self.a1], ) self.assertSequenceEqual( - Author.objects.filter(Q(extra__note=self.n1) | Q(item__note=self.n3)).filter(id=self.a1.id), + Author.objects.filter( + Q(extra__note=self.n1) | Q(item__note=self.n3) + ).filter(id=self.a1.id), [self.a1], ) def test_ticket6981(self): self.assertSequenceEqual( - Tag.objects.select_related('parent').order_by('name'), + Tag.objects.select_related("parent").order_by("name"), [self.t1, self.t2, self.t3, self.t4, self.t5], ) def test_ticket9926(self): self.assertSequenceEqual( - Tag.objects.select_related("parent", "category").order_by('name'), + Tag.objects.select_related("parent", "category").order_by("name"), [self.t1, self.t2, self.t3, self.t4, self.t5], ) self.assertSequenceEqual( - Tag.objects.select_related('parent', "parent__category").order_by('name'), + Tag.objects.select_related("parent", "parent__category").order_by("name"), [self.t1, self.t2, self.t3, self.t4, self.t5], ) def test_tickets_6180_6203(self): # Dates with limits and/or counts self.assertEqual(Item.objects.count(), 4) - self.assertEqual(Item.objects.datetimes('created', 'month').count(), 1) - self.assertEqual(Item.objects.datetimes('created', 'day').count(), 2) - self.assertEqual(len(Item.objects.datetimes('created', 'day')), 2) - self.assertEqual(Item.objects.datetimes('created', 'day')[0], datetime.datetime(2007, 12, 19, 0, 0)) + self.assertEqual(Item.objects.datetimes("created", "month").count(), 1) + self.assertEqual(Item.objects.datetimes("created", "day").count(), 2) + self.assertEqual(len(Item.objects.datetimes("created", "day")), 2) + self.assertEqual( + Item.objects.datetimes("created", "day")[0], + datetime.datetime(2007, 12, 19, 0, 0), + ) def test_tickets_7087_12242(self): # Dates with extra select columns self.assertSequenceEqual( - Item.objects.datetimes('created', 'day').extra(select={'a': 1}), - [datetime.datetime(2007, 12, 19, 0, 0), datetime.datetime(2007, 12, 20, 0, 0)], + Item.objects.datetimes("created", "day").extra(select={"a": 1}), + [ + datetime.datetime(2007, 12, 19, 0, 0), + datetime.datetime(2007, 12, 20, 0, 0), + ], ) self.assertSequenceEqual( - Item.objects.extra(select={'a': 1}).datetimes('created', 'day'), - [datetime.datetime(2007, 12, 19, 0, 0), datetime.datetime(2007, 12, 20, 0, 0)], + Item.objects.extra(select={"a": 1}).datetimes("created", "day"), + [ + datetime.datetime(2007, 12, 19, 0, 0), + datetime.datetime(2007, 12, 20, 0, 0), + ], ) name = "one" self.assertSequenceEqual( - Item.objects.datetimes('created', 'day').extra(where=['name=%s'], params=[name]), + Item.objects.datetimes("created", "day").extra( + where=["name=%s"], params=[name] + ), [datetime.datetime(2007, 12, 19, 0, 0)], ) self.assertSequenceEqual( - Item.objects.extra(where=['name=%s'], params=[name]).datetimes('created', 'day'), + Item.objects.extra(where=["name=%s"], params=[name]).datetimes( + "created", "day" + ), [datetime.datetime(2007, 12, 19, 0, 0)], ) def test_ticket7155(self): # Nullable dates self.assertSequenceEqual( - Item.objects.datetimes('modified', 'day'), + Item.objects.datetimes("modified", "day"), [datetime.datetime(2007, 12, 19, 0, 0)], ) def test_order_by_rawsql(self): self.assertSequenceEqual( - Item.objects.values('note__note').order_by( - RawSQL('queries_note.note', ()), - 'id', + Item.objects.values("note__note").order_by( + RawSQL("queries_note.note", ()), + "id", ), [ - {'note__note': 'n2'}, - {'note__note': 'n3'}, - {'note__note': 'n3'}, - {'note__note': 'n3'}, + {"note__note": "n2"}, + {"note__note": "n3"}, + {"note__note": "n3"}, + {"note__note": "n3"}, ], ) def test_ticket7096(self): # Make sure exclude() with multiple conditions continues to work. self.assertSequenceEqual( - Tag.objects.filter(parent=self.t1, name='t3').order_by('name'), + Tag.objects.filter(parent=self.t1, name="t3").order_by("name"), [self.t3], ) self.assertSequenceEqual( - Tag.objects.exclude(parent=self.t1, name='t3').order_by('name'), + Tag.objects.exclude(parent=self.t1, name="t3").order_by("name"), [self.t1, self.t2, self.t4, self.t5], ) self.assertSequenceEqual( - Item.objects.exclude(tags__name='t1', name='one').order_by('name').distinct(), + Item.objects.exclude(tags__name="t1", name="one") + .order_by("name") + .distinct(), [self.i4, self.i3, self.i2], ) self.assertSequenceEqual( - Item.objects.filter(name__in=['three', 'four']).exclude(tags__name='t1').order_by('name'), + Item.objects.filter(name__in=["three", "four"]) + .exclude(tags__name="t1") + .order_by("name"), [self.i4, self.i3], ) # More twisted cases, involving nested negations. self.assertSequenceEqual( - Item.objects.exclude(~Q(tags__name='t1', name='one')), + Item.objects.exclude(~Q(tags__name="t1", name="one")), [self.i1], ) self.assertSequenceEqual( - Item.objects.filter(~Q(tags__name='t1', name='one'), name='two'), + Item.objects.filter(~Q(tags__name="t1", name="one"), name="two"), [self.i2], ) self.assertSequenceEqual( - Item.objects.exclude(~Q(tags__name='t1', name='one'), name='two'), + Item.objects.exclude(~Q(tags__name="t1", name="one"), name="two"), [self.i4, self.i1, self.i3], ) @@ -653,14 +806,11 @@ class Queries1Tests(TestCase): qs = Item.objects.select_related() query = qs.query.get_compiler(qs.db).as_sql()[0] query2 = pickle.loads(pickle.dumps(qs.query)) - self.assertEqual( - query2.get_compiler(qs.db).as_sql()[0], - query - ) + self.assertEqual(query2.get_compiler(qs.db).as_sql()[0], query) def test_deferred_load_qs_pickling(self): # Check pickling of deferred-loading querysets - qs = Item.objects.defer('name', 'creator') + qs = Item.objects.defer("name", "creator") q2 = pickle.loads(pickle.dumps(qs)) self.assertEqual(list(qs), list(q2)) q3 = pickle.loads(pickle.dumps(qs, pickle.HIGHEST_PROTOCOL)) @@ -669,7 +819,9 @@ class Queries1Tests(TestCase): def test_ticket7277(self): self.assertSequenceEqual( self.n1.annotation_set.filter( - Q(tag=self.t5) | Q(tag__children=self.t5) | Q(tag__children__children=self.t5) + Q(tag=self.t5) + | Q(tag__children=self.t5) + | Q(tag__children__children=self.t5) ), [self.ann1], ) @@ -684,39 +836,33 @@ class Queries1Tests(TestCase): def test_ticket7235(self): # An EmptyQuerySet should not raise exceptions if it is filtered. - Eaten.objects.create(meal='m') + Eaten.objects.create(meal="m") q = Eaten.objects.none() with self.assertNumQueries(0): self.assertQuerysetEqual(q.all(), []) - self.assertQuerysetEqual(q.filter(meal='m'), []) - self.assertQuerysetEqual(q.exclude(meal='m'), []) - self.assertQuerysetEqual(q.complex_filter({'pk': 1}), []) - self.assertQuerysetEqual(q.select_related('food'), []) - self.assertQuerysetEqual(q.annotate(Count('food')), []) - self.assertQuerysetEqual(q.order_by('meal', 'food'), []) + self.assertQuerysetEqual(q.filter(meal="m"), []) + self.assertQuerysetEqual(q.exclude(meal="m"), []) + self.assertQuerysetEqual(q.complex_filter({"pk": 1}), []) + self.assertQuerysetEqual(q.select_related("food"), []) + self.assertQuerysetEqual(q.annotate(Count("food")), []) + self.assertQuerysetEqual(q.order_by("meal", "food"), []) self.assertQuerysetEqual(q.distinct(), []) - self.assertQuerysetEqual( - q.extra(select={'foo': "1"}), - [] - ) + self.assertQuerysetEqual(q.extra(select={"foo": "1"}), []) self.assertQuerysetEqual(q.reverse(), []) q.query.low_mark = 1 - msg = 'Cannot change a query once a slice has been taken.' + msg = "Cannot change a query once a slice has been taken." with self.assertRaisesMessage(TypeError, msg): - q.extra(select={'foo': "1"}) - self.assertQuerysetEqual(q.defer('meal'), []) - self.assertQuerysetEqual(q.only('meal'), []) + q.extra(select={"foo": "1"}) + self.assertQuerysetEqual(q.defer("meal"), []) + self.assertQuerysetEqual(q.only("meal"), []) def test_ticket7791(self): # There were "issues" when ordering and distinct-ing on fields related # via ForeignKeys. - self.assertEqual( - len(Note.objects.order_by('extrainfo__info').distinct()), - 3 - ) + self.assertEqual(len(Note.objects.order_by("extrainfo__info").distinct()), 3) # Pickling of QuerySets using datetimes() should work. - qs = Item.objects.datetimes('created', 'month') + qs = Item.objects.datetimes("created", "month") pickle.loads(pickle.dumps(qs)) def test_ticket9997(self): @@ -724,24 +870,38 @@ class Queries1Tests(TestCase): # make sure it's only requesting a single value and use that as the # thing to select. self.assertSequenceEqual( - Tag.objects.filter(name__in=Tag.objects.filter(parent=self.t1).values('name')), + Tag.objects.filter( + name__in=Tag.objects.filter(parent=self.t1).values("name") + ), [self.t2, self.t3], ) # Multi-valued values() and values_list() querysets should raise errors. - with self.assertRaisesMessage(TypeError, 'Cannot use multi-field values as a filter value.'): - Tag.objects.filter(name__in=Tag.objects.filter(parent=self.t1).values('name', 'id')) - with self.assertRaisesMessage(TypeError, 'Cannot use multi-field values as a filter value.'): - Tag.objects.filter(name__in=Tag.objects.filter(parent=self.t1).values_list('name', 'id')) + with self.assertRaisesMessage( + TypeError, "Cannot use multi-field values as a filter value." + ): + Tag.objects.filter( + name__in=Tag.objects.filter(parent=self.t1).values("name", "id") + ) + with self.assertRaisesMessage( + TypeError, "Cannot use multi-field values as a filter value." + ): + Tag.objects.filter( + name__in=Tag.objects.filter(parent=self.t1).values_list("name", "id") + ) def test_ticket9985(self): # qs.values_list(...).values(...) combinations should work. self.assertSequenceEqual( Note.objects.values_list("note", flat=True).values("id").order_by("id"), - [{'id': 1}, {'id': 2}, {'id': 3}] + [{"id": 1}, {"id": 2}, {"id": 3}], ) self.assertSequenceEqual( - Annotation.objects.filter(notes__in=Note.objects.filter(note="n1").values_list('note').values('id')), + Annotation.objects.filter( + notes__in=Note.objects.filter(note="n1") + .values_list("note") + .values("id") + ), [self.ann1], ) @@ -754,10 +914,12 @@ class Queries1Tests(TestCase): # Testing an empty "__in" filter with a generator as the value. def f(): return iter([]) + n_obj = Note.objects.all()[0] def g(): yield n_obj.pk + self.assertQuerysetEqual(Note.objects.filter(pk__in=f()), []) self.assertEqual(list(Note.objects.filter(pk__in=g())), [n_obj]) @@ -780,7 +942,7 @@ class Queries1Tests(TestCase): subq = Author.objects.filter(num__lt=3000) self.assertSequenceEqual( - Author.objects.filter(Q(pk__in=subq) & Q(name='a1')), + Author.objects.filter(Q(pk__in=subq) & Q(name="a1")), [self.a1], ) @@ -790,7 +952,7 @@ class Queries1Tests(TestCase): def test_ticket7076(self): # Excluding shouldn't eliminate NULL entries. self.assertSequenceEqual( - Item.objects.exclude(modified=self.time1).order_by('name'), + Item.objects.exclude(modified=self.time1).order_by("name"), [self.i4, self.i3, self.i2], ) self.assertSequenceEqual( @@ -802,7 +964,7 @@ class Queries1Tests(TestCase): # Ordering by related tables should accommodate nullable fields (this # test is a little tricky, since NULL ordering is database dependent. # Instead, we just count the number of results). - self.assertEqual(len(Tag.objects.order_by('parent__name')), 5) + self.assertEqual(len(Tag.objects.order_by("parent__name")), 5) # Empty querysets can be merged with others. self.assertSequenceEqual( @@ -820,26 +982,38 @@ class Queries1Tests(TestCase): # Complex combinations of conjunctions, disjunctions and nullable # relations. self.assertSequenceEqual( - Author.objects.filter(Q(item__note__extrainfo=self.e2) | Q(report=self.r1, name='xyz')), + Author.objects.filter( + Q(item__note__extrainfo=self.e2) | Q(report=self.r1, name="xyz") + ), [self.a2], ) self.assertSequenceEqual( - Author.objects.filter(Q(report=self.r1, name='xyz') | Q(item__note__extrainfo=self.e2)), + Author.objects.filter( + Q(report=self.r1, name="xyz") | Q(item__note__extrainfo=self.e2) + ), [self.a2], ) self.assertSequenceEqual( - Annotation.objects.filter(Q(tag__parent=self.t1) | Q(notes__note='n1', name='a1')), + Annotation.objects.filter( + Q(tag__parent=self.t1) | Q(notes__note="n1", name="a1") + ), [self.ann1], ) - xx = ExtraInfo.objects.create(info='xx', note=self.n3) + xx = ExtraInfo.objects.create(info="xx", note=self.n3) self.assertSequenceEqual( Note.objects.filter(Q(extrainfo__author=self.a1) | Q(extrainfo=xx)), [self.n1, self.n3], ) q = Note.objects.filter(Q(extrainfo__author=self.a1) | Q(extrainfo=xx)).query self.assertEqual( - len([x for x in q.alias_map.values() if x.join_type == LOUTER and q.alias_refcount[x.table_alias]]), - 1 + len( + [ + x + for x in q.alias_map.values() + if x.join_type == LOUTER and q.alias_refcount[x.table_alias] + ] + ), + 1, ) def test_ticket17429(self): @@ -858,35 +1032,43 @@ class Queries1Tests(TestCase): def test_exclude(self): self.assertQuerysetEqual( - Item.objects.exclude(tags__name='t4'), - Item.objects.filter(~Q(tags__name='t4'))) + Item.objects.exclude(tags__name="t4"), + Item.objects.filter(~Q(tags__name="t4")), + ) self.assertQuerysetEqual( - Item.objects.exclude(Q(tags__name='t4') | Q(tags__name='t3')), - Item.objects.filter(~(Q(tags__name='t4') | Q(tags__name='t3')))) + Item.objects.exclude(Q(tags__name="t4") | Q(tags__name="t3")), + Item.objects.filter(~(Q(tags__name="t4") | Q(tags__name="t3"))), + ) self.assertQuerysetEqual( - Item.objects.exclude(Q(tags__name='t4') | ~Q(tags__name='t3')), - Item.objects.filter(~(Q(tags__name='t4') | ~Q(tags__name='t3')))) + Item.objects.exclude(Q(tags__name="t4") | ~Q(tags__name="t3")), + Item.objects.filter(~(Q(tags__name="t4") | ~Q(tags__name="t3"))), + ) def test_nested_exclude(self): self.assertQuerysetEqual( - Item.objects.exclude(~Q(tags__name='t4')), - Item.objects.filter(~~Q(tags__name='t4'))) + Item.objects.exclude(~Q(tags__name="t4")), + Item.objects.filter(~~Q(tags__name="t4")), + ) def test_double_exclude(self): self.assertQuerysetEqual( - Item.objects.filter(Q(tags__name='t4')), - Item.objects.filter(~~Q(tags__name='t4'))) + Item.objects.filter(Q(tags__name="t4")), + Item.objects.filter(~~Q(tags__name="t4")), + ) self.assertQuerysetEqual( - Item.objects.filter(Q(tags__name='t4')), - Item.objects.filter(~Q(~Q(tags__name='t4')))) + Item.objects.filter(Q(tags__name="t4")), + Item.objects.filter(~Q(~Q(tags__name="t4"))), + ) def test_exclude_in(self): self.assertQuerysetEqual( - Item.objects.exclude(Q(tags__name__in=['t4', 't3'])), - Item.objects.filter(~Q(tags__name__in=['t4', 't3']))) + Item.objects.exclude(Q(tags__name__in=["t4", "t3"])), + Item.objects.filter(~Q(tags__name__in=["t4", "t3"])), + ) self.assertQuerysetEqual( - Item.objects.filter(Q(tags__name__in=['t4', 't3'])), - Item.objects.filter(~~Q(tags__name__in=['t4', 't3']))) + Item.objects.filter(Q(tags__name__in=["t4", "t3"])), + Item.objects.filter(~~Q(tags__name__in=["t4", "t3"])), + ) def test_ticket_10790_1(self): # Querying direct fields with isnull should trim the left outer join. @@ -894,26 +1076,26 @@ class Queries1Tests(TestCase): q = Tag.objects.filter(parent__isnull=True) self.assertSequenceEqual(q, [self.t1]) - self.assertNotIn('JOIN', str(q.query)) + self.assertNotIn("JOIN", str(q.query)) q = Tag.objects.filter(parent__isnull=False) self.assertSequenceEqual(q, [self.t2, self.t3, self.t4, self.t5]) - self.assertNotIn('JOIN', str(q.query)) + self.assertNotIn("JOIN", str(q.query)) q = Tag.objects.exclude(parent__isnull=True) self.assertSequenceEqual(q, [self.t2, self.t3, self.t4, self.t5]) - self.assertNotIn('JOIN', str(q.query)) + self.assertNotIn("JOIN", str(q.query)) q = Tag.objects.exclude(parent__isnull=False) self.assertSequenceEqual(q, [self.t1]) - self.assertNotIn('JOIN', str(q.query)) + self.assertNotIn("JOIN", str(q.query)) q = Tag.objects.exclude(parent__parent__isnull=False) self.assertSequenceEqual(q, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 1) - self.assertNotIn('INNER JOIN', str(q.query)) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 1) + self.assertNotIn("INNER JOIN", str(q.query)) def test_ticket_10790_2(self): # Querying across several tables should strip only the last outer join, @@ -921,58 +1103,58 @@ class Queries1Tests(TestCase): q = Tag.objects.filter(parent__parent__isnull=False) self.assertSequenceEqual(q, [self.t4, self.t5]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 1) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 1) # Querying without isnull should not convert anything to left outer join. q = Tag.objects.filter(parent__parent=self.t1) self.assertSequenceEqual(q, [self.t4, self.t5]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 1) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 1) def test_ticket_10790_3(self): # Querying via indirect fields should populate the left outer join q = NamedCategory.objects.filter(tag__isnull=True) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 1) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 1) # join to dumbcategory ptr_id - self.assertEqual(str(q.query).count('INNER JOIN'), 1) + self.assertEqual(str(q.query).count("INNER JOIN"), 1) self.assertSequenceEqual(q, []) # Querying across several tables should strip only the last join, while # preserving the preceding left outer joins. q = NamedCategory.objects.filter(tag__parent__isnull=True) - self.assertEqual(str(q.query).count('INNER JOIN'), 1) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 1) + self.assertEqual(str(q.query).count("INNER JOIN"), 1) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 1) self.assertSequenceEqual(q, [self.nc1]) def test_ticket_10790_4(self): # Querying across m2m field should not strip the m2m table from join. q = Author.objects.filter(item__tags__isnull=True) self.assertSequenceEqual(q, [self.a2, self.a3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 2) - self.assertNotIn('INNER JOIN', str(q.query)) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 2) + self.assertNotIn("INNER JOIN", str(q.query)) q = Author.objects.filter(item__tags__parent__isnull=True) self.assertSequenceEqual(q, [self.a1, self.a2, self.a2, self.a3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 3) - self.assertNotIn('INNER JOIN', str(q.query)) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 3) + self.assertNotIn("INNER JOIN", str(q.query)) def test_ticket_10790_5(self): # Querying with isnull=False across m2m field should not create outer joins q = Author.objects.filter(item__tags__isnull=False) self.assertSequenceEqual(q, [self.a1, self.a1, self.a2, self.a2, self.a4]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 2) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 2) q = Author.objects.filter(item__tags__parent__isnull=False) self.assertSequenceEqual(q, [self.a1, self.a2, self.a4]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 3) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 3) q = Author.objects.filter(item__tags__parent__parent__isnull=False) self.assertSequenceEqual(q, [self.a4]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 4) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 4) def test_ticket_10790_6(self): # Querying with isnull=True across m2m field should not create inner joins @@ -982,32 +1164,32 @@ class Queries1Tests(TestCase): q, [self.a1, self.a1, self.a2, self.a2, self.a2, self.a3], ) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 4) - self.assertEqual(str(q.query).count('INNER JOIN'), 0) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 4) + self.assertEqual(str(q.query).count("INNER JOIN"), 0) q = Author.objects.filter(item__tags__parent__isnull=True) self.assertSequenceEqual(q, [self.a1, self.a2, self.a2, self.a3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 3) - self.assertEqual(str(q.query).count('INNER JOIN'), 0) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 3) + self.assertEqual(str(q.query).count("INNER JOIN"), 0) def test_ticket_10790_7(self): # Reverse querying with isnull should not strip the join q = Author.objects.filter(item__isnull=True) self.assertSequenceEqual(q, [self.a3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 1) - self.assertEqual(str(q.query).count('INNER JOIN'), 0) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 1) + self.assertEqual(str(q.query).count("INNER JOIN"), 0) q = Author.objects.filter(item__isnull=False) self.assertSequenceEqual(q, [self.a1, self.a2, self.a2, self.a4]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 1) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 1) def test_ticket_10790_8(self): # Querying with combined q-objects should also strip the left outer join q = Tag.objects.filter(Q(parent__isnull=True) | Q(parent=self.t1)) self.assertSequenceEqual(q, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q.query).count('INNER JOIN'), 0) + self.assertEqual(str(q.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q.query).count("INNER JOIN"), 0) def test_ticket_10790_combine(self): # Combining queries should not re-populate the left outer join @@ -1016,50 +1198,52 @@ class Queries1Tests(TestCase): q3 = q1 | q2 self.assertSequenceEqual(q3, [self.t1, self.t2, self.t3, self.t4, self.t5]) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) q3 = q1 & q2 self.assertSequenceEqual(q3, []) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) q2 = Tag.objects.filter(parent=self.t1) q3 = q1 | q2 self.assertSequenceEqual(q3, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) q3 = q2 | q1 self.assertSequenceEqual(q3, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) q1 = Tag.objects.filter(parent__isnull=True) q2 = Tag.objects.filter(parent__parent__isnull=True) q3 = q1 | q2 self.assertSequenceEqual(q3, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 1) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 1) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) q3 = q2 | q1 self.assertSequenceEqual(q3, [self.t1, self.t2, self.t3]) - self.assertEqual(str(q3.query).count('LEFT OUTER JOIN'), 1) - self.assertEqual(str(q3.query).count('INNER JOIN'), 0) + self.assertEqual(str(q3.query).count("LEFT OUTER JOIN"), 1) + self.assertEqual(str(q3.query).count("INNER JOIN"), 0) def test_ticket19672(self): self.assertSequenceEqual( - Report.objects.filter(Q(creator__isnull=False) & ~Q(creator__extra__value=41)), + Report.objects.filter( + Q(creator__isnull=False) & ~Q(creator__extra__value=41) + ), [self.r1], ) def test_ticket_20250(self): # A negated Q along with an annotated queryset failed in Django 1.4 - qs = Author.objects.annotate(Count('item')) - qs = qs.filter(~Q(extra__value=0)).order_by('name') + qs = Author.objects.annotate(Count("item")) + qs = qs.filter(~Q(extra__value=0)).order_by("name") - self.assertIn('SELECT', str(qs.query)) + self.assertIn("SELECT", str(qs.query)) self.assertSequenceEqual(qs, [self.a1, self.a2, self.a3, self.a4]) def test_lookup_constraint_fielderror(self): @@ -1069,41 +1253,47 @@ class Queries1Tests(TestCase): "managedmodel, name, note, parent, parent_id" ) with self.assertRaisesMessage(FieldError, msg): - Tag.objects.filter(unknown_field__name='generic') + Tag.objects.filter(unknown_field__name="generic") def test_common_mixed_case_foreign_keys(self): """ Valid query should be generated when fields fetched from joined tables include FKs whose names only differ by case. """ - c1 = SimpleCategory.objects.create(name='c1') - c2 = SimpleCategory.objects.create(name='c2') - c3 = SimpleCategory.objects.create(name='c3') + c1 = SimpleCategory.objects.create(name="c1") + c2 = SimpleCategory.objects.create(name="c2") + c3 = SimpleCategory.objects.create(name="c3") category = CategoryItem.objects.create(category=c1) - mixed_case_field_category = MixedCaseFieldCategoryItem.objects.create(CaTeGoRy=c2) - mixed_case_db_column_category = MixedCaseDbColumnCategoryItem.objects.create(category=c3) + mixed_case_field_category = MixedCaseFieldCategoryItem.objects.create( + CaTeGoRy=c2 + ) + mixed_case_db_column_category = MixedCaseDbColumnCategoryItem.objects.create( + category=c3 + ) CommonMixedCaseForeignKeys.objects.create( category=category, mixed_case_field_category=mixed_case_field_category, mixed_case_db_column_category=mixed_case_db_column_category, ) qs = CommonMixedCaseForeignKeys.objects.values( - 'category', - 'mixed_case_field_category', - 'mixed_case_db_column_category', - 'category__category', - 'mixed_case_field_category__CaTeGoRy', - 'mixed_case_db_column_category__category', + "category", + "mixed_case_field_category", + "mixed_case_db_column_category", + "category__category", + "mixed_case_field_category__CaTeGoRy", + "mixed_case_db_column_category__category", ) self.assertTrue(qs.first()) def test_excluded_intermediary_m2m_table_joined(self): self.assertSequenceEqual( - Note.objects.filter(~Q(tag__annotation__name=F('note'))), + Note.objects.filter(~Q(tag__annotation__name=F("note"))), [self.n1, self.n2, self.n3], ) self.assertSequenceEqual( - Note.objects.filter(tag__annotation__name='a1').filter(~Q(tag__annotation__name=F('note'))), + Note.objects.filter(tag__annotation__name="a1").filter( + ~Q(tag__annotation__name=F("note")) + ), [], ) @@ -1138,16 +1328,13 @@ class Queries2Tests(TestCase): [self.num12], ) self.assertSequenceEqual( - Number.objects.filter(Q(num__lt=4) | Q(num__gt=8, num__lt=12)), - [] + Number.objects.filter(Q(num__lt=4) | Q(num__gt=8, num__lt=12)), [] ) self.assertSequenceEqual( - Number.objects.filter(Q(num__gt=8, num__lt=12) | Q(num__lt=4)), - [] + Number.objects.filter(Q(num__gt=8, num__lt=12) | Q(num__lt=4)), [] ) self.assertSequenceEqual( - Number.objects.filter(Q(num__gt=8) & Q(num__lt=12) | Q(num__lt=4)), - [] + Number.objects.filter(Q(num__gt=8) & Q(num__lt=12) | Q(num__lt=4)), [] ) self.assertSequenceEqual( Number.objects.filter(Q(num__gt=7) & Q(num__lt=12) | Q(num__lt=4)), @@ -1219,6 +1406,7 @@ class Queries2Tests(TestCase): def run(): for obj in qs: return qs.count() == count + self.assertTrue(run()) @@ -1232,13 +1420,17 @@ class Queries3Tests(TestCase): # wrong type of field. msg = "'name' isn't a DateField, TimeField, or DateTimeField." with self.assertRaisesMessage(TypeError, msg): - Item.objects.datetimes('name', 'month') + Item.objects.datetimes("name", "month") def test_ticket22023(self): - with self.assertRaisesMessage(TypeError, "Cannot call only() after .values() or .values_list()"): + with self.assertRaisesMessage( + TypeError, "Cannot call only() after .values() or .values_list()" + ): Valid.objects.values().only() - with self.assertRaisesMessage(TypeError, "Cannot call defer() after .values() or .values_list()"): + with self.assertRaisesMessage( + TypeError, "Cannot call defer() after .values() or .values_list()" + ): Valid.objects.values().defer() @@ -1246,38 +1438,44 @@ class Queries4Tests(TestCase): @classmethod def setUpTestData(cls): generic = NamedCategory.objects.create(name="Generic") - cls.t1 = Tag.objects.create(name='t1', category=generic) + cls.t1 = Tag.objects.create(name="t1", category=generic) - n1 = Note.objects.create(note='n1', misc='foo') - n2 = Note.objects.create(note='n2', misc='bar') + n1 = Note.objects.create(note="n1", misc="foo") + n2 = Note.objects.create(note="n2", misc="bar") - e1 = ExtraInfo.objects.create(info='e1', note=n1) - e2 = ExtraInfo.objects.create(info='e2', note=n2) + e1 = ExtraInfo.objects.create(info="e1", note=n1) + e2 = ExtraInfo.objects.create(info="e2", note=n2) - cls.a1 = Author.objects.create(name='a1', num=1001, extra=e1) - cls.a3 = Author.objects.create(name='a3', num=3003, extra=e2) + cls.a1 = Author.objects.create(name="a1", num=1001, extra=e1) + cls.a3 = Author.objects.create(name="a3", num=3003, extra=e2) - cls.r1 = Report.objects.create(name='r1', creator=cls.a1) - cls.r2 = Report.objects.create(name='r2', creator=cls.a3) - cls.r3 = Report.objects.create(name='r3') + cls.r1 = Report.objects.create(name="r1", creator=cls.a1) + cls.r2 = Report.objects.create(name="r2", creator=cls.a3) + cls.r3 = Report.objects.create(name="r3") - cls.i1 = Item.objects.create(name='i1', created=datetime.datetime.now(), note=n1, creator=cls.a1) - cls.i2 = Item.objects.create(name='i2', created=datetime.datetime.now(), note=n1, creator=cls.a3) + cls.i1 = Item.objects.create( + name="i1", created=datetime.datetime.now(), note=n1, creator=cls.a1 + ) + cls.i2 = Item.objects.create( + name="i2", created=datetime.datetime.now(), note=n1, creator=cls.a3 + ) def test_ticket24525(self): tag = Tag.objects.create() - anth100 = tag.note_set.create(note='ANTH', misc='100') - math101 = tag.note_set.create(note='MATH', misc='101') - s1 = tag.annotation_set.create(name='1') - s2 = tag.annotation_set.create(name='2') + anth100 = tag.note_set.create(note="ANTH", misc="100") + math101 = tag.note_set.create(note="MATH", misc="101") + s1 = tag.annotation_set.create(name="1") + s2 = tag.annotation_set.create(name="2") s1.notes.set([math101, anth100]) s2.notes.set([math101]) - result = math101.annotation_set.all() & tag.annotation_set.exclude(notes__in=[anth100]) + result = math101.annotation_set.all() & tag.annotation_set.exclude( + notes__in=[anth100] + ) self.assertEqual(list(result), [s2]) def test_ticket11811(self): unsaved_category = NamedCategory(name="Other") - msg = 'Unsaved model instance <NamedCategory: Other> cannot be used in an ORM query.' + msg = "Unsaved model instance <NamedCategory: Other> cannot be used in an ORM query." with self.assertRaisesMessage(ValueError, msg): Tag.objects.filter(pk=self.t1.pk).update(category=unsaved_category) @@ -1286,29 +1484,40 @@ class Queries4Tests(TestCase): # about the join type of the trimmed "creator__isnull" join. If we # don't have that information, then the join is created as INNER JOIN # and results will be incorrect. - q1 = Report.objects.filter(Q(creator__isnull=True) | Q(creator__extra__info='e1')) - q2 = Report.objects.filter(Q(creator__isnull=True)) | Report.objects.filter(Q(creator__extra__info='e1')) + q1 = Report.objects.filter( + Q(creator__isnull=True) | Q(creator__extra__info="e1") + ) + q2 = Report.objects.filter(Q(creator__isnull=True)) | Report.objects.filter( + Q(creator__extra__info="e1") + ) self.assertCountEqual(q1, [self.r1, self.r3]) self.assertEqual(str(q1.query), str(q2.query)) - q1 = Report.objects.filter(Q(creator__extra__info='e1') | Q(creator__isnull=True)) - q2 = Report.objects.filter(Q(creator__extra__info='e1')) | Report.objects.filter(Q(creator__isnull=True)) + q1 = Report.objects.filter( + Q(creator__extra__info="e1") | Q(creator__isnull=True) + ) + q2 = Report.objects.filter( + Q(creator__extra__info="e1") + ) | Report.objects.filter(Q(creator__isnull=True)) self.assertCountEqual(q1, [self.r1, self.r3]) self.assertEqual(str(q1.query), str(q2.query)) - q1 = Item.objects.filter(Q(creator=self.a1) | Q(creator__report__name='r1')).order_by() + q1 = Item.objects.filter( + Q(creator=self.a1) | Q(creator__report__name="r1") + ).order_by() q2 = ( - Item.objects - .filter(Q(creator=self.a1)).order_by() | Item.objects.filter(Q(creator__report__name='r1')) - .order_by() + Item.objects.filter(Q(creator=self.a1)).order_by() + | Item.objects.filter(Q(creator__report__name="r1")).order_by() ) self.assertCountEqual(q1, [self.i1]) self.assertEqual(str(q1.query), str(q2.query)) - q1 = Item.objects.filter(Q(creator__report__name='e1') | Q(creator=self.a1)).order_by() + q1 = Item.objects.filter( + Q(creator__report__name="e1") | Q(creator=self.a1) + ).order_by() q2 = ( - Item.objects.filter(Q(creator__report__name='e1')).order_by() | - Item.objects.filter(Q(creator=self.a1)).order_by() + Item.objects.filter(Q(creator__report__name="e1")).order_by() + | Item.objects.filter(Q(creator=self.a1)).order_by() ) self.assertCountEqual(q1, [self.i1]) self.assertEqual(str(q1.query), str(q2.query)) @@ -1316,17 +1525,17 @@ class Queries4Tests(TestCase): def test_combine_join_reuse(self): # Joins having identical connections are correctly recreated in the # rhs query, in case the query is ORed together (#18748). - Report.objects.create(name='r4', creator=self.a1) - q1 = Author.objects.filter(report__name='r5') - q2 = Author.objects.filter(report__name='r4').filter(report__name='r1') + Report.objects.create(name="r4", creator=self.a1) + q1 = Author.objects.filter(report__name="r5") + q2 = Author.objects.filter(report__name="r4").filter(report__name="r1") combined = q1 | q2 - self.assertEqual(str(combined.query).count('JOIN'), 2) + self.assertEqual(str(combined.query).count("JOIN"), 2) self.assertEqual(len(combined), 1) - self.assertEqual(combined[0].name, 'a1') + self.assertEqual(combined[0].name, "a1") def test_combine_or_filter_reuse(self): - combined = Author.objects.filter(name='a1') | Author.objects.filter(name='a3') - self.assertEqual(combined.get(name='a1'), self.a1) + combined = Author.objects.filter(name="a1") | Author.objects.filter(name="a3") + self.assertEqual(combined.get(name="a1"), self.a1) def test_join_reuse_order(self): # Join aliases are reused in order. This shouldn't raise AssertionError @@ -1343,27 +1552,29 @@ class Queries4Tests(TestCase): def test_ticket7095(self): # Updates that are filtered on the model being updated are somewhat # tricky in MySQL. - ManagedModel.objects.create(data='mm1', tag=self.t1, public=True) - self.assertEqual(ManagedModel.objects.update(data='mm'), 1) + ManagedModel.objects.create(data="mm1", tag=self.t1, public=True) + self.assertEqual(ManagedModel.objects.update(data="mm"), 1) # A values() or values_list() query across joined models must use outer # joins appropriately. # Note: In Oracle, we expect a null CharField to return '' instead of # None. if connection.features.interprets_empty_strings_as_nulls: - expected_null_charfield_repr = '' + expected_null_charfield_repr = "" else: expected_null_charfield_repr = None self.assertSequenceEqual( - Report.objects.values_list("creator__extra__info", flat=True).order_by("name"), - ['e1', 'e2', expected_null_charfield_repr], + Report.objects.values_list("creator__extra__info", flat=True).order_by( + "name" + ), + ["e1", "e2", expected_null_charfield_repr], ) # Similarly for select_related(), joins beyond an initial nullable join # must use outer joins so that all results are included. self.assertSequenceEqual( Report.objects.select_related("creator", "creator__extra").order_by("name"), - [self.r1, self.r2, self.r3] + [self.r1, self.r2, self.r3], ) # When there are multiple paths to a table from another table, we have @@ -1377,13 +1588,13 @@ class Queries4Tests(TestCase): m2 = Member.objects.create(name="m2", details=d2) Child.objects.create(person=m2, parent=m1) obj = m1.children.select_related("person__details")[0] - self.assertEqual(obj.person.details.data, 'd2') + self.assertEqual(obj.person.details.data, "d2") def test_order_by_resetting(self): # Calling order_by() with no parameters removes any existing ordering on the # model. But it should still be possible to add new ordering after that. - qs = Author.objects.order_by().order_by('name') - self.assertIn('ORDER BY', qs.query.get_compiler(qs.db).as_sql()[0]) + qs = Author.objects.order_by().order_by("name") + self.assertIn("ORDER BY", qs.query.get_compiler(qs.db).as_sql()[0]) def test_order_by_reverse_fk(self): # It is possible to order by reverse of foreign key, although that can lead @@ -1393,11 +1604,13 @@ class Queries4Tests(TestCase): CategoryItem.objects.create(category=c1) CategoryItem.objects.create(category=c2) CategoryItem.objects.create(category=c1) - self.assertSequenceEqual(SimpleCategory.objects.order_by('categoryitem', 'pk'), [c1, c2, c1]) + self.assertSequenceEqual( + SimpleCategory.objects.order_by("categoryitem", "pk"), [c1, c2, c1] + ) def test_filter_reverse_non_integer_pk(self): date_obj = DateTimePK.objects.create() - extra_obj = ExtraInfo.objects.create(info='extra', date=date_obj) + extra_obj = ExtraInfo.objects.create(info="extra", date=date_obj) self.assertEqual( DateTimePK.objects.filter(extrainfo=extra_obj).get(), date_obj, @@ -1407,14 +1620,17 @@ class Queries4Tests(TestCase): # Avoid raising an EmptyResultSet if an inner query is probably # empty (and hence, not executed). self.assertQuerysetEqual( - Tag.objects.filter(id__in=Tag.objects.filter(id__in=[])), - [] + Tag.objects.filter(id__in=Tag.objects.filter(id__in=[])), [] ) def test_ticket15316_filter_false(self): c1 = SimpleCategory.objects.create(name="category1") - c2 = SpecialCategory.objects.create(name="named category1", special_name="special1") - c3 = SpecialCategory.objects.create(name="named category2", special_name="special2") + c2 = SpecialCategory.objects.create( + name="named category1", special_name="special1" + ) + c3 = SpecialCategory.objects.create( + name="named category2", special_name="special2" + ) CategoryItem.objects.create(category=c1) ci2 = CategoryItem.objects.create(category=c2) @@ -1426,8 +1642,12 @@ class Queries4Tests(TestCase): def test_ticket15316_exclude_false(self): c1 = SimpleCategory.objects.create(name="category1") - c2 = SpecialCategory.objects.create(name="named category1", special_name="special1") - c3 = SpecialCategory.objects.create(name="named category2", special_name="special2") + c2 = SpecialCategory.objects.create( + name="named category1", special_name="special1" + ) + c3 = SpecialCategory.objects.create( + name="named category2", special_name="special2" + ) ci1 = CategoryItem.objects.create(category=c1) CategoryItem.objects.create(category=c2) @@ -1439,8 +1659,12 @@ class Queries4Tests(TestCase): def test_ticket15316_filter_true(self): c1 = SimpleCategory.objects.create(name="category1") - c2 = SpecialCategory.objects.create(name="named category1", special_name="special1") - c3 = SpecialCategory.objects.create(name="named category2", special_name="special2") + c2 = SpecialCategory.objects.create( + name="named category1", special_name="special1" + ) + c3 = SpecialCategory.objects.create( + name="named category2", special_name="special2" + ) ci1 = CategoryItem.objects.create(category=c1) CategoryItem.objects.create(category=c2) @@ -1452,8 +1676,12 @@ class Queries4Tests(TestCase): def test_ticket15316_exclude_true(self): c1 = SimpleCategory.objects.create(name="category1") - c2 = SpecialCategory.objects.create(name="named category1", special_name="special1") - c3 = SpecialCategory.objects.create(name="named category2", special_name="special2") + c2 = SpecialCategory.objects.create( + name="named category1", special_name="special1" + ) + c3 = SpecialCategory.objects.create( + name="named category2", special_name="special2" + ) CategoryItem.objects.create(category=c1) ci2 = CategoryItem.objects.create(category=c2) @@ -1475,7 +1703,9 @@ class Queries4Tests(TestCase): ci2 = CategoryItem.objects.create(category=c0) ci3 = CategoryItem.objects.create(category=c1) - qs = CategoryItem.objects.filter(category__onetoonecategory__isnull=False).order_by('pk') + qs = CategoryItem.objects.filter( + category__onetoonecategory__isnull=False + ).order_by("pk") self.assertEqual(qs.count(), 2) self.assertSequenceEqual(qs, [ci2, ci3]) @@ -1523,7 +1753,9 @@ class Queries4Tests(TestCase): ci2 = CategoryItem.objects.create(category=c0) ci3 = CategoryItem.objects.create(category=c1) - qs = CategoryItem.objects.exclude(category__onetoonecategory__isnull=True).order_by('pk') + qs = CategoryItem.objects.exclude( + category__onetoonecategory__isnull=True + ).order_by("pk") self.assertEqual(qs.count(), 2) self.assertSequenceEqual(qs, [ci2, ci3]) @@ -1533,13 +1765,13 @@ class Queries5Tests(TestCase): def setUpTestData(cls): # Ordering by 'rank' gives us rank2, rank1, rank3. Ordering by the # Meta.ordering will be rank3, rank2, rank1. - cls.n1 = Note.objects.create(note='n1', misc='foo', id=1) - cls.n2 = Note.objects.create(note='n2', misc='bar', id=2) - e1 = ExtraInfo.objects.create(info='e1', note=cls.n1) - e2 = ExtraInfo.objects.create(info='e2', note=cls.n2) - a1 = Author.objects.create(name='a1', num=1001, extra=e1) - a2 = Author.objects.create(name='a2', num=2002, extra=e1) - a3 = Author.objects.create(name='a3', num=3003, extra=e2) + cls.n1 = Note.objects.create(note="n1", misc="foo", id=1) + cls.n2 = Note.objects.create(note="n2", misc="bar", id=2) + e1 = ExtraInfo.objects.create(info="e1", note=cls.n1) + e2 = ExtraInfo.objects.create(info="e2", note=cls.n2) + a1 = Author.objects.create(name="a1", num=1001, extra=e1) + a2 = Author.objects.create(name="a2", num=2002, extra=e1) + a3 = Author.objects.create(name="a3", num=3003, extra=e2) cls.rank2 = Ranking.objects.create(rank=2, author=a2) cls.rank1 = Ranking.objects.create(rank=1, author=a3) cls.rank3 = Ranking.objects.create(rank=3, author=a1) @@ -1551,53 +1783,55 @@ class Queries5Tests(TestCase): [self.rank3, self.rank2, self.rank1], ) self.assertSequenceEqual( - Ranking.objects.all().order_by('rank'), + Ranking.objects.all().order_by("rank"), [self.rank1, self.rank2, self.rank3], ) # Ordering of extra() pieces is possible, too and you can mix extra # fields and model fields in the ordering. self.assertSequenceEqual( - Ranking.objects.extra(tables=['django_site'], order_by=['-django_site.id', 'rank']), + Ranking.objects.extra( + tables=["django_site"], order_by=["-django_site.id", "rank"] + ), [self.rank1, self.rank2, self.rank3], ) - sql = 'case when %s > 2 then 1 else 0 end' % connection.ops.quote_name('rank') - qs = Ranking.objects.extra(select={'good': sql}) + sql = "case when %s > 2 then 1 else 0 end" % connection.ops.quote_name("rank") + qs = Ranking.objects.extra(select={"good": sql}) self.assertEqual( - [o.good for o in qs.extra(order_by=('-good',))], - [True, False, False] + [o.good for o in qs.extra(order_by=("-good",))], [True, False, False] ) self.assertSequenceEqual( - qs.extra(order_by=('-good', 'id')), + qs.extra(order_by=("-good", "id")), [self.rank3, self.rank2, self.rank1], ) # Despite having some extra aliases in the query, we can still omit # them in a values() query. - dicts = qs.values('id', 'rank').order_by('id') - self.assertEqual( - [d['rank'] for d in dicts], - [2, 1, 3] - ) + dicts = qs.values("id", "rank").order_by("id") + self.assertEqual([d["rank"] for d in dicts], [2, 1, 3]) def test_ticket7256(self): # An empty values() call includes all aliases, including those from an # extra() - sql = 'case when %s > 2 then 1 else 0 end' % connection.ops.quote_name('rank') - qs = Ranking.objects.extra(select={'good': sql}) - dicts = qs.values().order_by('id') + sql = "case when %s > 2 then 1 else 0 end" % connection.ops.quote_name("rank") + qs = Ranking.objects.extra(select={"good": sql}) + dicts = qs.values().order_by("id") for d in dicts: - del d['id'] - del d['author_id'] + del d["id"] + del d["author_id"] self.assertEqual( [sorted(d.items()) for d in dicts], - [[('good', 0), ('rank', 2)], [('good', 0), ('rank', 1)], [('good', 1), ('rank', 3)]] + [ + [("good", 0), ("rank", 2)], + [("good", 0), ("rank", 1)], + [("good", 1), ("rank", 3)], + ], ) def test_ticket7045(self): # Extra tables used to crash SQL construction on the second use. - qs = Ranking.objects.extra(tables=['django_site']) + qs = Ranking.objects.extra(tables=["django_site"]) qs.query.get_compiler(qs.db).as_sql() # test passes if this doesn't raise an exception. qs.query.get_compiler(qs.db).as_sql() @@ -1605,20 +1839,17 @@ class Queries5Tests(TestCase): def test_ticket9848(self): # Make sure that updates which only filter on sub-tables don't # inadvertently update the wrong records (bug #9848). - author_start = Author.objects.get(name='a1') - ranking_start = Ranking.objects.get(author__name='a1') + author_start = Author.objects.get(name="a1") + ranking_start = Ranking.objects.get(author__name="a1") # Make sure that the IDs from different tables don't happen to match. self.assertSequenceEqual( - Ranking.objects.filter(author__name='a1'), + Ranking.objects.filter(author__name="a1"), [self.rank3], ) - self.assertEqual( - Ranking.objects.filter(author__name='a1').update(rank=4636), - 1 - ) + self.assertEqual(Ranking.objects.filter(author__name="a1").update(rank=4636), 1) - r = Ranking.objects.get(author__name='a1') + r = Ranking.objects.get(author__name="a1") self.assertEqual(r.id, ranking_start.id) self.assertEqual(r.author.id, author_start.id) self.assertEqual(r.rank, 4636) @@ -1650,22 +1881,17 @@ class Queries5Tests(TestCase): def test_extra_select_literal_percent_s(self): # Allow %%s to escape select clauses + self.assertEqual(Note.objects.extra(select={"foo": "'%%s'"})[0].foo, "%s") self.assertEqual( - Note.objects.extra(select={'foo': "'%%s'"})[0].foo, - '%s' - ) - self.assertEqual( - Note.objects.extra(select={'foo': "'%%s bar %%s'"})[0].foo, - '%s bar %s' + Note.objects.extra(select={"foo": "'%%s bar %%s'"})[0].foo, "%s bar %s" ) self.assertEqual( - Note.objects.extra(select={'foo': "'bar %%s'"})[0].foo, - 'bar %s' + Note.objects.extra(select={"foo": "'bar %%s'"})[0].foo, "bar %s" ) def test_queryset_reuse(self): # Using querysets doesn't mutate aliases. - authors = Author.objects.filter(Q(name='a1') | Q(name='nonexistent')) + authors = Author.objects.filter(Q(name="a1") | Q(name="nonexistent")) self.assertEqual(Ranking.objects.filter(author__in=authors).get(), self.rank3) self.assertEqual(authors.count(), 1) @@ -1697,7 +1923,7 @@ class SubclassFKTests(TestCase): class CustomPkTests(TestCase): def test_ticket7371(self): - self.assertQuerysetEqual(Related.objects.order_by('custom'), []) + self.assertQuerysetEqual(Related.objects.order_by("custom"), []) class NullableRelOrderingTests(TestCase): @@ -1711,49 +1937,55 @@ class NullableRelOrderingTests(TestCase): # Ordering by model related to nullable relations should not change # the join type of already existing joins. Plaything.objects.create(name="p1") - s = SingleObject.objects.create(name='s') + s = SingleObject.objects.create(name="s") r = RelatedObject.objects.create(single=s, f=1) p2 = Plaything.objects.create(name="p2", others=r) - qs = Plaything.objects.all().filter(others__isnull=False).order_by('pk') - self.assertNotIn('JOIN', str(qs.query)) - qs = Plaything.objects.all().filter(others__f__isnull=False).order_by('pk') - self.assertIn('INNER', str(qs.query)) - qs = qs.order_by('others__single__name') + qs = Plaything.objects.all().filter(others__isnull=False).order_by("pk") + self.assertNotIn("JOIN", str(qs.query)) + qs = Plaything.objects.all().filter(others__f__isnull=False).order_by("pk") + self.assertIn("INNER", str(qs.query)) + qs = qs.order_by("others__single__name") # The ordering by others__single__pk will add one new join (to single) # and that join must be LEFT join. The already existing join to related # objects must be kept INNER. So, we have both an INNER and a LEFT join # in the query. - self.assertEqual(str(qs.query).count('LEFT'), 1) - self.assertEqual(str(qs.query).count('INNER'), 1) + self.assertEqual(str(qs.query).count("LEFT"), 1) + self.assertEqual(str(qs.query).count("INNER"), 1) self.assertSequenceEqual(qs, [p2]) class DisjunctiveFilterTests(TestCase): @classmethod def setUpTestData(cls): - cls.n1 = Note.objects.create(note='n1', misc='foo', id=1) - cls.e1 = ExtraInfo.objects.create(info='e1', note=cls.n1) + cls.n1 = Note.objects.create(note="n1", misc="foo", id=1) + cls.e1 = ExtraInfo.objects.create(info="e1", note=cls.n1) def test_ticket7872(self): # Another variation on the disjunctive filtering theme. # For the purposes of this regression test, it's important that there is no # Join object related to the LeafA we create. - l1 = LeafA.objects.create(data='first') + l1 = LeafA.objects.create(data="first") self.assertSequenceEqual(LeafA.objects.all(), [l1]) self.assertSequenceEqual( - LeafA.objects.filter(Q(data='first') | Q(join__b__data='second')), + LeafA.objects.filter(Q(data="first") | Q(join__b__data="second")), [l1], ) def test_ticket8283(self): # Checking that applying filters after a disjunction works correctly. self.assertSequenceEqual( - (ExtraInfo.objects.filter(note=self.n1) | ExtraInfo.objects.filter(info='e2')).filter(note=self.n1), + ( + ExtraInfo.objects.filter(note=self.n1) + | ExtraInfo.objects.filter(info="e2") + ).filter(note=self.n1), [self.e1], ) self.assertSequenceEqual( - (ExtraInfo.objects.filter(info='e2') | ExtraInfo.objects.filter(note=self.n1)).filter(note=self.n1), + ( + ExtraInfo.objects.filter(info="e2") + | ExtraInfo.objects.filter(note=self.n1) + ).filter(note=self.n1), [self.e1], ) @@ -1762,26 +1994,26 @@ class Queries6Tests(TestCase): @classmethod def setUpTestData(cls): generic = NamedCategory.objects.create(name="Generic") - cls.t1 = Tag.objects.create(name='t1', category=generic) - cls.t2 = Tag.objects.create(name='t2', parent=cls.t1, category=generic) - cls.t3 = Tag.objects.create(name='t3', parent=cls.t1) - cls.t4 = Tag.objects.create(name='t4', parent=cls.t3) - cls.t5 = Tag.objects.create(name='t5', parent=cls.t3) - n1 = Note.objects.create(note='n1', misc='foo', id=1) - cls.ann1 = Annotation.objects.create(name='a1', tag=cls.t1) + cls.t1 = Tag.objects.create(name="t1", category=generic) + cls.t2 = Tag.objects.create(name="t2", parent=cls.t1, category=generic) + cls.t3 = Tag.objects.create(name="t3", parent=cls.t1) + cls.t4 = Tag.objects.create(name="t4", parent=cls.t3) + cls.t5 = Tag.objects.create(name="t5", parent=cls.t3) + n1 = Note.objects.create(note="n1", misc="foo", id=1) + cls.ann1 = Annotation.objects.create(name="a1", tag=cls.t1) cls.ann1.notes.add(n1) - cls.ann2 = Annotation.objects.create(name='a2', tag=cls.t4) + cls.ann2 = Annotation.objects.create(name="a2", tag=cls.t4) def test_parallel_iterators(self): # Parallel iterators work. qs = Tag.objects.all() i1, i2 = iter(qs), iter(qs) - self.assertEqual(repr(next(i1)), '<Tag: t1>') - self.assertEqual(repr(next(i1)), '<Tag: t2>') - self.assertEqual(repr(next(i2)), '<Tag: t1>') - self.assertEqual(repr(next(i2)), '<Tag: t2>') - self.assertEqual(repr(next(i2)), '<Tag: t3>') - self.assertEqual(repr(next(i1)), '<Tag: t3>') + self.assertEqual(repr(next(i1)), "<Tag: t1>") + self.assertEqual(repr(next(i1)), "<Tag: t2>") + self.assertEqual(repr(next(i2)), "<Tag: t1>") + self.assertEqual(repr(next(i2)), "<Tag: t2>") + self.assertEqual(repr(next(i2)), "<Tag: t3>") + self.assertEqual(repr(next(i1)), "<Tag: t3>") qs = X.objects.all() self.assertFalse(qs) @@ -1791,10 +2023,7 @@ class Queries6Tests(TestCase): # Nested queries should not evaluate the inner query as part of constructing the # SQL (so we should see a nested query here, indicated by two "SELECT" calls). qs = Annotation.objects.filter(notes__in=Note.objects.filter(note="xyzzy")) - self.assertEqual( - qs.query.get_compiler(qs.db).as_sql()[0].count('SELECT'), - 2 - ) + self.assertEqual(qs.query.get_compiler(qs.db).as_sql()[0].count("SELECT"), 2) def test_tickets_8921_9188(self): # Incorrect SQL was being generated for certain types of exclude() @@ -1802,12 +2031,10 @@ class Queries6Tests(TestCase): # preemptively discovered cases). self.assertSequenceEqual( - PointerA.objects.filter(connection__pointerb__id=1), - [] + PointerA.objects.filter(connection__pointerb__id=1), [] ) self.assertSequenceEqual( - PointerA.objects.exclude(connection__pointerb__id=1), - [] + PointerA.objects.exclude(connection__pointerb__id=1), [] ) self.assertSequenceEqual( @@ -1839,62 +2066,73 @@ class Queries6Tests(TestCase): def test_ticket3739(self): # The all() method on querysets returns a copy of the queryset. - q1 = Tag.objects.order_by('name') + q1 = Tag.objects.order_by("name") self.assertIsNot(q1, q1.all()) def test_ticket_11320(self): - qs = Tag.objects.exclude(category=None).exclude(category__name='foo') - self.assertEqual(str(qs.query).count(' INNER JOIN '), 1) + qs = Tag.objects.exclude(category=None).exclude(category__name="foo") + self.assertEqual(str(qs.query).count(" INNER JOIN "), 1) def test_distinct_ordered_sliced_subquery_aggregation(self): - self.assertEqual(Tag.objects.distinct().order_by('category__name')[:3].count(), 3) + self.assertEqual( + Tag.objects.distinct().order_by("category__name")[:3].count(), 3 + ) def test_multiple_columns_with_the_same_name_slice(self): self.assertEqual( - list(Tag.objects.order_by('name').values_list('name', 'category__name')[:2]), - [('t1', 'Generic'), ('t2', 'Generic')], + list( + Tag.objects.order_by("name").values_list("name", "category__name")[:2] + ), + [("t1", "Generic"), ("t2", "Generic")], ) self.assertSequenceEqual( - Tag.objects.order_by('name').select_related('category')[:2], + Tag.objects.order_by("name").select_related("category")[:2], [self.t1, self.t2], ) self.assertEqual( - list(Tag.objects.order_by('-name').values_list('name', 'parent__name')[:2]), - [('t5', 't3'), ('t4', 't3')], + list(Tag.objects.order_by("-name").values_list("name", "parent__name")[:2]), + [("t5", "t3"), ("t4", "t3")], ) self.assertSequenceEqual( - Tag.objects.order_by('-name').select_related('parent')[:2], + Tag.objects.order_by("-name").select_related("parent")[:2], [self.t5, self.t4], ) def test_col_alias_quoted(self): with CaptureQueriesContext(connection) as captured_queries: self.assertEqual( - Tag.objects.values('parent').annotate( - tag_per_parent=Count('pk'), - ).aggregate(Max('tag_per_parent')), - {'tag_per_parent__max': 2}, + Tag.objects.values("parent") + .annotate( + tag_per_parent=Count("pk"), + ) + .aggregate(Max("tag_per_parent")), + {"tag_per_parent__max": 2}, ) - sql = captured_queries[0]['sql'] - self.assertIn('AS %s' % connection.ops.quote_name('col1'), sql) + sql = captured_queries[0]["sql"] + self.assertIn("AS %s" % connection.ops.quote_name("col1"), sql) class RawQueriesTests(TestCase): @classmethod def setUpTestData(cls): - Note.objects.create(note='n1', misc='foo', id=1) + Note.objects.create(note="n1", misc="foo", id=1) def test_ticket14729(self): # Test representation of raw query with one or few parameters passed as list query = "SELECT * FROM queries_note WHERE note = %s" - params = ['n1'] + params = ["n1"] qs = Note.objects.raw(query, params=params) - self.assertEqual(repr(qs), "<RawQuerySet: SELECT * FROM queries_note WHERE note = n1>") + self.assertEqual( + repr(qs), "<RawQuerySet: SELECT * FROM queries_note WHERE note = n1>" + ) query = "SELECT * FROM queries_note WHERE note = %s and misc = %s" - params = ['n1', 'foo'] + params = ["n1", "foo"] qs = Note.objects.raw(query, params=params) - self.assertEqual(repr(qs), "<RawQuerySet: SELECT * FROM queries_note WHERE note = n1 and misc = foo>") + self.assertEqual( + repr(qs), + "<RawQuerySet: SELECT * FROM queries_note WHERE note = n1 and misc = foo>", + ) class GeneratorExpressionTests(SimpleTestCase): @@ -1907,14 +2145,18 @@ class GeneratorExpressionTests(SimpleTestCase): class ComparisonTests(TestCase): @classmethod def setUpTestData(cls): - cls.n1 = Note.objects.create(note='n1', misc='foo', id=1) - e1 = ExtraInfo.objects.create(info='e1', note=cls.n1) - cls.a2 = Author.objects.create(name='a2', num=2002, extra=e1) + cls.n1 = Note.objects.create(note="n1", misc="foo", id=1) + e1 = ExtraInfo.objects.create(info="e1", note=cls.n1) + cls.a2 = Author.objects.create(name="a2", num=2002, extra=e1) def test_ticket8597(self): # Regression tests for case-insensitive comparisons - item_ab = Item.objects.create(name="a_b", created=datetime.datetime.now(), creator=self.a2, note=self.n1) - item_xy = Item.objects.create(name="x%y", created=datetime.datetime.now(), creator=self.a2, note=self.n1) + item_ab = Item.objects.create( + name="a_b", created=datetime.datetime.now(), creator=self.a2, note=self.n1 + ) + item_xy = Item.objects.create( + name="x%y", created=datetime.datetime.now(), creator=self.a2, note=self.n1 + ) self.assertSequenceEqual( Item.objects.filter(name__iexact="A_b"), [item_ab], @@ -1939,28 +2181,28 @@ class ExistsSql(TestCase): self.assertFalse(Tag.objects.exists()) # Ok - so the exist query worked - but did it include too many columns? self.assertEqual(len(captured_queries), 1) - qstr = captured_queries[0]['sql'] - id, name = connection.ops.quote_name('id'), connection.ops.quote_name('name') + qstr = captured_queries[0]["sql"] + id, name = connection.ops.quote_name("id"), connection.ops.quote_name("name") self.assertNotIn(id, qstr) self.assertNotIn(name, qstr) def test_ticket_18414(self): - Article.objects.create(name='one', created=datetime.datetime.now()) - Article.objects.create(name='one', created=datetime.datetime.now()) - Article.objects.create(name='two', created=datetime.datetime.now()) + Article.objects.create(name="one", created=datetime.datetime.now()) + Article.objects.create(name="one", created=datetime.datetime.now()) + Article.objects.create(name="two", created=datetime.datetime.now()) self.assertTrue(Article.objects.exists()) self.assertTrue(Article.objects.distinct().exists()) self.assertTrue(Article.objects.distinct()[1:3].exists()) self.assertFalse(Article.objects.distinct()[1:1].exists()) - @skipUnlessDBFeature('can_distinct_on_fields') + @skipUnlessDBFeature("can_distinct_on_fields") def test_ticket_18414_distinct_on(self): - Article.objects.create(name='one', created=datetime.datetime.now()) - Article.objects.create(name='one', created=datetime.datetime.now()) - Article.objects.create(name='two', created=datetime.datetime.now()) - self.assertTrue(Article.objects.distinct('name').exists()) - self.assertTrue(Article.objects.distinct('name')[1:2].exists()) - self.assertFalse(Article.objects.distinct('name')[2:3].exists()) + Article.objects.create(name="one", created=datetime.datetime.now()) + Article.objects.create(name="one", created=datetime.datetime.now()) + Article.objects.create(name="two", created=datetime.datetime.now()) + self.assertTrue(Article.objects.distinct("name").exists()) + self.assertTrue(Article.objects.distinct("name")[1:2].exists()) + self.assertFalse(Article.objects.distinct("name")[2:3].exists()) class QuerysetOrderedTests(unittest.TestCase): @@ -1976,64 +2218,78 @@ class QuerysetOrderedTests(unittest.TestCase): self.assertIs(Tag.objects.all().order_by().ordered, False) def test_explicit_ordering(self): - self.assertIs(Annotation.objects.all().order_by('id').ordered, True) + self.assertIs(Annotation.objects.all().order_by("id").ordered, True) def test_empty_queryset(self): self.assertIs(Annotation.objects.none().ordered, True) def test_order_by_extra(self): - self.assertIs(Annotation.objects.all().extra(order_by=['id']).ordered, True) + self.assertIs(Annotation.objects.all().extra(order_by=["id"]).ordered, True) def test_annotated_ordering(self): - qs = Annotation.objects.annotate(num_notes=Count('notes')) + qs = Annotation.objects.annotate(num_notes=Count("notes")) self.assertIs(qs.ordered, False) - self.assertIs(qs.order_by('num_notes').ordered, True) + self.assertIs(qs.order_by("num_notes").ordered, True) def test_annotated_default_ordering(self): - qs = Tag.objects.annotate(num_notes=Count('pk')) + qs = Tag.objects.annotate(num_notes=Count("pk")) self.assertIs(qs.ordered, False) - self.assertIs(qs.order_by('name').ordered, True) + self.assertIs(qs.order_by("name").ordered, True) def test_annotated_values_default_ordering(self): - qs = Tag.objects.values('name').annotate(num_notes=Count('pk')) + qs = Tag.objects.values("name").annotate(num_notes=Count("pk")) self.assertIs(qs.ordered, False) - self.assertIs(qs.order_by('name').ordered, True) + self.assertIs(qs.order_by("name").ordered, True) -@skipUnlessDBFeature('allow_sliced_subqueries_with_in') +@skipUnlessDBFeature("allow_sliced_subqueries_with_in") class SubqueryTests(TestCase): @classmethod def setUpTestData(cls): - NamedCategory.objects.create(id=1, name='first') - NamedCategory.objects.create(id=2, name='second') - NamedCategory.objects.create(id=3, name='third') - NamedCategory.objects.create(id=4, name='fourth') + NamedCategory.objects.create(id=1, name="first") + NamedCategory.objects.create(id=2, name="second") + NamedCategory.objects.create(id=3, name="third") + NamedCategory.objects.create(id=4, name="fourth") def test_ordered_subselect(self): "Subselects honor any manual ordering" - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[0:2]) - self.assertEqual(set(query.values_list('id', flat=True)), {3, 4}) + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[0:2] + ) + self.assertEqual(set(query.values_list("id", flat=True)), {3, 4}) - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[:2]) - self.assertEqual(set(query.values_list('id', flat=True)), {3, 4}) + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[:2] + ) + self.assertEqual(set(query.values_list("id", flat=True)), {3, 4}) - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[1:2]) - self.assertEqual(set(query.values_list('id', flat=True)), {3}) + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[1:2] + ) + self.assertEqual(set(query.values_list("id", flat=True)), {3}) - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[2:]) - self.assertEqual(set(query.values_list('id', flat=True)), {1, 2}) + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[2:] + ) + self.assertEqual(set(query.values_list("id", flat=True)), {1, 2}) def test_slice_subquery_and_query(self): """ Slice a query that has a sliced subquery """ - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[0:2])[0:2] + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[0:2] + )[0:2] self.assertEqual({x.id for x in query}, {3, 4}) - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[1:3])[1:3] + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[1:3] + )[1:3] self.assertEqual({x.id for x in query}, {3}) - query = DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[2:])[1:] + query = DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[2:] + )[1:] self.assertEqual({x.id for x in query}, {2}) def test_related_sliced_subquery(self): @@ -2042,47 +2298,67 @@ class SubqueryTests(TestCase): refs #22434 """ generic = NamedCategory.objects.create(id=5, name="Generic") - t1 = Tag.objects.create(name='t1', category=generic) - t2 = Tag.objects.create(name='t2', category=generic) - ManagedModel.objects.create(data='mm1', tag=t1, public=True) - mm2 = ManagedModel.objects.create(data='mm2', tag=t2, public=True) + t1 = Tag.objects.create(name="t1", category=generic) + t2 = Tag.objects.create(name="t2", category=generic) + ManagedModel.objects.create(data="mm1", tag=t1, public=True) + mm2 = ManagedModel.objects.create(data="mm2", tag=t2, public=True) query = ManagedModel.normal_manager.filter( - tag__in=Tag.objects.order_by('-id')[:1] + tag__in=Tag.objects.order_by("-id")[:1] ) self.assertEqual({x.id for x in query}, {mm2.id}) def test_sliced_delete(self): "Delete queries can safely contain sliced subqueries" - DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[0:1]).delete() - self.assertEqual(set(DumbCategory.objects.values_list('id', flat=True)), {1, 2, 3}) + DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[0:1] + ).delete() + self.assertEqual( + set(DumbCategory.objects.values_list("id", flat=True)), {1, 2, 3} + ) - DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[1:2]).delete() - self.assertEqual(set(DumbCategory.objects.values_list('id', flat=True)), {1, 3}) + DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[1:2] + ).delete() + self.assertEqual(set(DumbCategory.objects.values_list("id", flat=True)), {1, 3}) - DumbCategory.objects.filter(id__in=DumbCategory.objects.order_by('-id')[1:]).delete() - self.assertEqual(set(DumbCategory.objects.values_list('id', flat=True)), {3}) + DumbCategory.objects.filter( + id__in=DumbCategory.objects.order_by("-id")[1:] + ).delete() + self.assertEqual(set(DumbCategory.objects.values_list("id", flat=True)), {3}) def test_distinct_ordered_sliced_subquery(self): # Implicit values('id'). self.assertSequenceEqual( NamedCategory.objects.filter( - id__in=NamedCategory.objects.distinct().order_by('name')[0:2], - ).order_by('name').values_list('name', flat=True), ['first', 'fourth'] + id__in=NamedCategory.objects.distinct().order_by("name")[0:2], + ) + .order_by("name") + .values_list("name", flat=True), + ["first", "fourth"], ) # Explicit values('id'). self.assertSequenceEqual( NamedCategory.objects.filter( - id__in=NamedCategory.objects.distinct().order_by('-name').values('id')[0:2], - ).order_by('name').values_list('name', flat=True), ['second', 'third'] + id__in=NamedCategory.objects.distinct() + .order_by("-name") + .values("id")[0:2], + ) + .order_by("name") + .values_list("name", flat=True), + ["second", "third"], ) # Annotated value. self.assertSequenceEqual( DumbCategory.objects.filter( - id__in=DumbCategory.objects.annotate( - double_id=F('id') * 2 - ).order_by('id').distinct().values('double_id')[0:2], - ).order_by('id').values_list('id', flat=True), [2, 4] + id__in=DumbCategory.objects.annotate(double_id=F("id") * 2) + .order_by("id") + .distinct() + .values("double_id")[0:2], + ) + .order_by("id") + .values_list("id", flat=True), + [2, 4], ) @@ -2090,10 +2366,18 @@ class QuerySetBitwiseOperationTests(TestCase): @classmethod def setUpTestData(cls): cls.school = School.objects.create() - cls.room_1 = Classroom.objects.create(school=cls.school, has_blackboard=False, name='Room 1') - cls.room_2 = Classroom.objects.create(school=cls.school, has_blackboard=True, name='Room 2') - cls.room_3 = Classroom.objects.create(school=cls.school, has_blackboard=True, name='Room 3') - cls.room_4 = Classroom.objects.create(school=cls.school, has_blackboard=False, name='Room 4') + cls.room_1 = Classroom.objects.create( + school=cls.school, has_blackboard=False, name="Room 1" + ) + cls.room_2 = Classroom.objects.create( + school=cls.school, has_blackboard=True, name="Room 2" + ) + cls.room_3 = Classroom.objects.create( + school=cls.school, has_blackboard=True, name="Room 3" + ) + cls.room_4 = Classroom.objects.create( + school=cls.school, has_blackboard=False, name="Room 4" + ) tag = Tag.objects.create() cls.annotation_1 = Annotation.objects.create(tag=tag) annotation_2 = Annotation.objects.create(tag=tag) @@ -2101,49 +2385,53 @@ class QuerySetBitwiseOperationTests(TestCase): cls.base_user_1 = BaseUser.objects.create(annotation=cls.annotation_1) cls.base_user_2 = BaseUser.objects.create(annotation=annotation_2) cls.task = Task.objects.create( - owner=cls.base_user_2, creator=cls.base_user_2, note=note, + owner=cls.base_user_2, + creator=cls.base_user_2, + note=note, ) - @skipUnlessDBFeature('allow_sliced_subqueries_with_in') + @skipUnlessDBFeature("allow_sliced_subqueries_with_in") def test_or_with_rhs_slice(self): qs1 = Classroom.objects.filter(has_blackboard=True) qs2 = Classroom.objects.filter(has_blackboard=False)[:1] self.assertCountEqual(qs1 | qs2, [self.room_1, self.room_2, self.room_3]) - @skipUnlessDBFeature('allow_sliced_subqueries_with_in') + @skipUnlessDBFeature("allow_sliced_subqueries_with_in") def test_or_with_lhs_slice(self): qs1 = Classroom.objects.filter(has_blackboard=True)[:1] qs2 = Classroom.objects.filter(has_blackboard=False) self.assertCountEqual(qs1 | qs2, [self.room_1, self.room_2, self.room_4]) - @skipUnlessDBFeature('allow_sliced_subqueries_with_in') + @skipUnlessDBFeature("allow_sliced_subqueries_with_in") def test_or_with_both_slice(self): qs1 = Classroom.objects.filter(has_blackboard=False)[:1] qs2 = Classroom.objects.filter(has_blackboard=True)[:1] self.assertCountEqual(qs1 | qs2, [self.room_1, self.room_2]) - @skipUnlessDBFeature('allow_sliced_subqueries_with_in') + @skipUnlessDBFeature("allow_sliced_subqueries_with_in") def test_or_with_both_slice_and_ordering(self): - qs1 = Classroom.objects.filter(has_blackboard=False).order_by('-pk')[:1] - qs2 = Classroom.objects.filter(has_blackboard=True).order_by('-name')[:1] + qs1 = Classroom.objects.filter(has_blackboard=False).order_by("-pk")[:1] + qs2 = Classroom.objects.filter(has_blackboard=True).order_by("-name")[:1] self.assertCountEqual(qs1 | qs2, [self.room_3, self.room_4]) def test_subquery_aliases(self): combined = School.objects.filter(pk__isnull=False) & School.objects.filter( - Exists(Classroom.objects.filter( - has_blackboard=True, - school=OuterRef('pk'), - )), + Exists( + Classroom.objects.filter( + has_blackboard=True, + school=OuterRef("pk"), + ) + ), ) self.assertSequenceEqual(combined, [self.school]) - nested_combined = School.objects.filter(pk__in=combined.values('pk')) + nested_combined = School.objects.filter(pk__in=combined.values("pk")) self.assertSequenceEqual(nested_combined, [self.school]) def test_conflicting_aliases_during_combine(self): qs1 = self.annotation_1.baseuser_set.all() qs2 = BaseUser.objects.filter( - Q(owner__note__in=self.annotation_1.notes.all()) | - Q(creator__note__in=self.annotation_1.notes.all()) + Q(owner__note__in=self.annotation_1.notes.all()) + | Q(creator__note__in=self.annotation_1.notes.all()) ) self.assertSequenceEqual(qs1, [self.base_user_1]) self.assertSequenceEqual(qs2, [self.base_user_2]) @@ -2152,12 +2440,11 @@ class QuerySetBitwiseOperationTests(TestCase): class CloneTests(TestCase): - def test_evaluated_queryset_as_argument(self): "#13227 -- If a queryset is already evaluated, it can still be used as a query arg" - n = Note(note='Test1', misc='misc') + n = Note(note="Test1", misc="misc") n.save() - e = ExtraInfo(info='good', note=n) + e = ExtraInfo(info="good", note=n) e.save() n_list = Note.objects.all() @@ -2169,7 +2456,7 @@ class CloneTests(TestCase): pickle.dumps(n_list) # Use the note queryset in a query, and evaluate # that query in a way that involves cloning. - self.assertEqual(ExtraInfo.objects.filter(note__in=n_list)[0].info, 'good') + self.assertEqual(ExtraInfo.objects.filter(note__in=n_list)[0].info, "good") def test_no_model_options_cloning(self): """ @@ -2179,9 +2466,11 @@ class CloneTests(TestCase): """ opts_class = type(Note._meta) note_deepcopy = getattr(opts_class, "__deepcopy__", None) - opts_class.__deepcopy__ = lambda obj, memo: self.fail("Model options shouldn't be cloned.") + opts_class.__deepcopy__ = lambda obj, memo: self.fail( + "Model options shouldn't be cloned." + ) try: - Note.objects.filter(pk__lte=F('pk') + 1).all() + Note.objects.filter(pk__lte=F("pk") + 1).all() finally: if note_deepcopy is None: delattr(opts_class, "__deepcopy__") @@ -2196,9 +2485,11 @@ class CloneTests(TestCase): """ opts_class = type(Note._meta.get_field("misc")) note_deepcopy = getattr(opts_class, "__deepcopy__", None) - opts_class.__deepcopy__ = lambda obj, memo: self.fail("Model fields shouldn't be cloned") + opts_class.__deepcopy__ = lambda obj, memo: self.fail( + "Model fields shouldn't be cloned" + ) try: - Note.objects.filter(note=F('misc')).all() + Note.objects.filter(note=F("misc")).all() finally: if note_deepcopy is None: delattr(opts_class, "__deepcopy__") @@ -2210,11 +2501,15 @@ class EmptyQuerySetTests(SimpleTestCase): def test_emptyqueryset_values(self): # #14366 -- Calling .values() on an empty QuerySet and then cloning # that should not cause an error - self.assertCountEqual(Number.objects.none().values('num').order_by('num'), []) + self.assertCountEqual(Number.objects.none().values("num").order_by("num"), []) def test_values_subquery(self): - self.assertCountEqual(Number.objects.filter(pk__in=Number.objects.none().values('pk')), []) - self.assertCountEqual(Number.objects.filter(pk__in=Number.objects.none().values_list('pk')), []) + self.assertCountEqual( + Number.objects.filter(pk__in=Number.objects.none().values("pk")), [] + ) + self.assertCountEqual( + Number.objects.filter(pk__in=Number.objects.none().values_list("pk")), [] + ) def test_ticket_19151(self): # #19151 -- Calling .values() or .values_list() on an empty QuerySet @@ -2236,138 +2531,158 @@ class ValuesQuerysetTests(TestCase): def test_extra_values(self): # testing for ticket 14930 issues - qs = Number.objects.extra(select={'value_plus_x': 'num+%s', 'value_minus_x': 'num-%s'}, select_params=(1, 2)) - qs = qs.order_by('value_minus_x') - qs = qs.values('num') - self.assertSequenceEqual(qs, [{'num': 72}]) + qs = Number.objects.extra( + select={"value_plus_x": "num+%s", "value_minus_x": "num-%s"}, + select_params=(1, 2), + ) + qs = qs.order_by("value_minus_x") + qs = qs.values("num") + self.assertSequenceEqual(qs, [{"num": 72}]) def test_extra_values_order_twice(self): # testing for ticket 14930 issues - qs = Number.objects.extra(select={'value_plus_one': 'num+1', 'value_minus_one': 'num-1'}) - qs = qs.order_by('value_minus_one').order_by('value_plus_one') - qs = qs.values('num') - self.assertSequenceEqual(qs, [{'num': 72}]) + qs = Number.objects.extra( + select={"value_plus_one": "num+1", "value_minus_one": "num-1"} + ) + qs = qs.order_by("value_minus_one").order_by("value_plus_one") + qs = qs.values("num") + self.assertSequenceEqual(qs, [{"num": 72}]) def test_extra_values_order_multiple(self): # Postgres doesn't allow constants in order by, so check for that. - qs = Number.objects.extra(select={ - 'value_plus_one': 'num+1', - 'value_minus_one': 'num-1', - 'constant_value': '1' - }) - qs = qs.order_by('value_plus_one', 'value_minus_one', 'constant_value') - qs = qs.values('num') - self.assertSequenceEqual(qs, [{'num': 72}]) + qs = Number.objects.extra( + select={ + "value_plus_one": "num+1", + "value_minus_one": "num-1", + "constant_value": "1", + } + ) + qs = qs.order_by("value_plus_one", "value_minus_one", "constant_value") + qs = qs.values("num") + self.assertSequenceEqual(qs, [{"num": 72}]) def test_extra_values_order_in_extra(self): # testing for ticket 14930 issues qs = Number.objects.extra( - select={'value_plus_one': 'num+1', 'value_minus_one': 'num-1'}, - order_by=['value_minus_one'], + select={"value_plus_one": "num+1", "value_minus_one": "num-1"}, + order_by=["value_minus_one"], ) - qs = qs.values('num') + qs = qs.values("num") def test_extra_select_params_values_order_in_extra(self): # testing for 23259 issue qs = Number.objects.extra( - select={'value_plus_x': 'num+%s'}, + select={"value_plus_x": "num+%s"}, select_params=[1], - order_by=['value_plus_x'], + order_by=["value_plus_x"], ) qs = qs.filter(num=72) - qs = qs.values('num') - self.assertSequenceEqual(qs, [{'num': 72}]) + qs = qs.values("num") + self.assertSequenceEqual(qs, [{"num": 72}]) def test_extra_multiple_select_params_values_order_by(self): # testing for 23259 issue - qs = Number.objects.extra(select={'value_plus_x': 'num+%s', 'value_minus_x': 'num-%s'}, select_params=(72, 72)) - qs = qs.order_by('value_minus_x') + qs = Number.objects.extra( + select={"value_plus_x": "num+%s", "value_minus_x": "num-%s"}, + select_params=(72, 72), + ) + qs = qs.order_by("value_minus_x") qs = qs.filter(num=1) - qs = qs.values('num') + qs = qs.values("num") self.assertSequenceEqual(qs, []) def test_extra_values_list(self): # testing for ticket 14930 issues - qs = Number.objects.extra(select={'value_plus_one': 'num+1'}) - qs = qs.order_by('value_plus_one') - qs = qs.values_list('num') + qs = Number.objects.extra(select={"value_plus_one": "num+1"}) + qs = qs.order_by("value_plus_one") + qs = qs.values_list("num") self.assertSequenceEqual(qs, [(72,)]) def test_flat_extra_values_list(self): # testing for ticket 14930 issues - qs = Number.objects.extra(select={'value_plus_one': 'num+1'}) - qs = qs.order_by('value_plus_one') - qs = qs.values_list('num', flat=True) + qs = Number.objects.extra(select={"value_plus_one": "num+1"}) + qs = qs.order_by("value_plus_one") + qs = qs.values_list("num", flat=True) self.assertSequenceEqual(qs, [72]) def test_field_error_values_list(self): # see #23443 - msg = "Cannot resolve keyword %r into field. Join on 'name' not permitted." % 'foo' + msg = ( + "Cannot resolve keyword %r into field. Join on 'name' not permitted." + % "foo" + ) with self.assertRaisesMessage(FieldError, msg): - Tag.objects.values_list('name__foo') + Tag.objects.values_list("name__foo") def test_named_values_list_flat(self): msg = "'flat' and 'named' can't be used together." with self.assertRaisesMessage(TypeError, msg): - Number.objects.values_list('num', flat=True, named=True) + Number.objects.values_list("num", flat=True, named=True) def test_named_values_list_bad_field_name(self): msg = "Type names and field names must be valid identifiers: '1'" with self.assertRaisesMessage(ValueError, msg): - Number.objects.extra(select={'1': 'num+1'}).values_list('1', named=True).first() + Number.objects.extra(select={"1": "num+1"}).values_list( + "1", named=True + ).first() def test_named_values_list_with_fields(self): - qs = Number.objects.extra(select={'num2': 'num+1'}).annotate(Count('id')) - values = qs.values_list('num', 'num2', named=True).first() - self.assertEqual(type(values).__name__, 'Row') - self.assertEqual(values._fields, ('num', 'num2')) + qs = Number.objects.extra(select={"num2": "num+1"}).annotate(Count("id")) + values = qs.values_list("num", "num2", named=True).first() + self.assertEqual(type(values).__name__, "Row") + self.assertEqual(values._fields, ("num", "num2")) self.assertEqual(values.num, 72) self.assertEqual(values.num2, 73) def test_named_values_list_without_fields(self): - qs = Number.objects.extra(select={'num2': 'num+1'}).annotate(Count('id')) + qs = Number.objects.extra(select={"num2": "num+1"}).annotate(Count("id")) values = qs.values_list(named=True).first() - self.assertEqual(type(values).__name__, 'Row') + self.assertEqual(type(values).__name__, "Row") self.assertEqual( values._fields, - ('num2', 'id', 'num', 'other_num', 'another_num', 'id__count'), + ("num2", "id", "num", "other_num", "another_num", "id__count"), ) self.assertEqual(values.num, 72) self.assertEqual(values.num2, 73) self.assertEqual(values.id__count, 1) def test_named_values_list_expression_with_default_alias(self): - expr = Count('id') - values = Number.objects.annotate(id__count1=expr).values_list(expr, 'id__count1', named=True).first() - self.assertEqual(values._fields, ('id__count2', 'id__count1')) + expr = Count("id") + values = ( + Number.objects.annotate(id__count1=expr) + .values_list(expr, "id__count1", named=True) + .first() + ) + self.assertEqual(values._fields, ("id__count2", "id__count1")) def test_named_values_list_expression(self): - expr = F('num') + 1 - qs = Number.objects.annotate(combinedexpression1=expr).values_list(expr, 'combinedexpression1', named=True) + expr = F("num") + 1 + qs = Number.objects.annotate(combinedexpression1=expr).values_list( + expr, "combinedexpression1", named=True + ) values = qs.first() - self.assertEqual(values._fields, ('combinedexpression2', 'combinedexpression1')) + self.assertEqual(values._fields, ("combinedexpression2", "combinedexpression1")) def test_named_values_pickle(self): - value = Number.objects.values_list('num', 'other_num', named=True).get() + value = Number.objects.values_list("num", "other_num", named=True).get() self.assertEqual(value, (72, None)) self.assertEqual(pickle.loads(pickle.dumps(value)), value) class QuerySetSupportsPythonIdioms(TestCase): - @classmethod def setUpTestData(cls): some_date = datetime.datetime(2014, 5, 16, 12, 1) cls.articles = [ - Article.objects.create(name=f'Article {i}', created=some_date) + Article.objects.create(name=f"Article {i}", created=some_date) for i in range(1, 8) ] def get_ordered_articles(self): - return Article.objects.all().order_by('name') + return Article.objects.all().order_by("name") def test_can_get_items_using_index_and_slice_notation(self): - self.assertEqual(self.get_ordered_articles()[0].name, 'Article 1') + self.assertEqual(self.get_ordered_articles()[0].name, "Article 1") self.assertSequenceEqual( self.get_ordered_articles()[1:3], [self.articles[1], self.articles[2]], @@ -2375,12 +2690,13 @@ class QuerySetSupportsPythonIdioms(TestCase): def test_slicing_with_steps_can_be_used(self): self.assertSequenceEqual( - self.get_ordered_articles()[::2], [ + self.get_ordered_articles()[::2], + [ self.articles[0], self.articles[2], self.articles[4], self.articles[6], - ] + ], ) def test_slicing_without_step_is_lazy(self): @@ -2396,7 +2712,9 @@ class QuerySetSupportsPythonIdioms(TestCase): self.get_ordered_articles()[0:5][0:2], [self.articles[0], self.articles[1]], ) - self.assertSequenceEqual(self.get_ordered_articles()[0:5][4:], [self.articles[4]]) + self.assertSequenceEqual( + self.get_ordered_articles()[0:5][4:], [self.articles[4]] + ) self.assertSequenceEqual(self.get_ordered_articles()[0:5][5:], []) # Some more tests! @@ -2408,7 +2726,9 @@ class QuerySetSupportsPythonIdioms(TestCase): self.get_ordered_articles()[2:][:2], [self.articles[2], self.articles[3]], ) - self.assertSequenceEqual(self.get_ordered_articles()[2:][2:3], [self.articles[4]]) + self.assertSequenceEqual( + self.get_ordered_articles()[2:][2:3], [self.articles[4]] + ) # Using an offset without a limit is also possible. self.assertSequenceEqual( @@ -2417,47 +2737,47 @@ class QuerySetSupportsPythonIdioms(TestCase): ) def test_slicing_cannot_filter_queryset_once_sliced(self): - msg = 'Cannot filter a query once a slice has been taken.' + msg = "Cannot filter a query once a slice has been taken." with self.assertRaisesMessage(TypeError, msg): Article.objects.all()[0:5].filter(id=1) def test_slicing_cannot_reorder_queryset_once_sliced(self): - msg = 'Cannot reorder a query once a slice has been taken.' + msg = "Cannot reorder a query once a slice has been taken." with self.assertRaisesMessage(TypeError, msg): - Article.objects.all()[0:5].order_by('id') + Article.objects.all()[0:5].order_by("id") def test_slicing_cannot_combine_queries_once_sliced(self): - msg = 'Cannot combine queries once a slice has been taken.' + msg = "Cannot combine queries once a slice has been taken." with self.assertRaisesMessage(TypeError, msg): Article.objects.all()[0:1] & Article.objects.all()[4:5] def test_slicing_negative_indexing_not_supported_for_single_element(self): """hint: inverting your ordering might do what you need""" - msg = 'Negative indexing is not supported.' + msg = "Negative indexing is not supported." with self.assertRaisesMessage(ValueError, msg): Article.objects.all()[-1] def test_slicing_negative_indexing_not_supported_for_range(self): """hint: inverting your ordering might do what you need""" - msg = 'Negative indexing is not supported.' + msg = "Negative indexing is not supported." with self.assertRaisesMessage(ValueError, msg): Article.objects.all()[0:-5] with self.assertRaisesMessage(ValueError, msg): Article.objects.all()[-1:] def test_invalid_index(self): - msg = 'QuerySet indices must be integers or slices, not str.' + msg = "QuerySet indices must be integers or slices, not str." with self.assertRaisesMessage(TypeError, msg): - Article.objects.all()['foo'] + Article.objects.all()["foo"] def test_can_get_number_of_items_in_queryset_using_standard_len(self): - self.assertEqual(len(Article.objects.filter(name__exact='Article 1')), 1) + self.assertEqual(len(Article.objects.filter(name__exact="Article 1")), 1) def test_can_combine_queries_using_and_and_or_operators(self): - s1 = Article.objects.filter(name__exact='Article 1') - s2 = Article.objects.filter(name__exact='Article 2') + s1 = Article.objects.filter(name__exact="Article 1") + s2 = Article.objects.filter(name__exact="Article 2") self.assertSequenceEqual( - (s1 | s2).order_by('name'), + (s1 | s2).order_by("name"), [self.articles[0], self.articles[1]], ) self.assertSequenceEqual(s1 & s2, []) @@ -2469,32 +2789,36 @@ class WeirdQuerysetSlicingTests(TestCase): Number.objects.create(num=1) Number.objects.create(num=2) - Article.objects.create(name='one', created=datetime.datetime.now()) - Article.objects.create(name='two', created=datetime.datetime.now()) - Article.objects.create(name='three', created=datetime.datetime.now()) - Article.objects.create(name='four', created=datetime.datetime.now()) + Article.objects.create(name="one", created=datetime.datetime.now()) + Article.objects.create(name="two", created=datetime.datetime.now()) + Article.objects.create(name="three", created=datetime.datetime.now()) + Article.objects.create(name="four", created=datetime.datetime.now()) - food = Food.objects.create(name='spam') - Eaten.objects.create(meal='spam with eggs', food=food) + food = Food.objects.create(name="spam") + Eaten.objects.create(meal="spam with eggs", food=food) def test_tickets_7698_10202(self): # People like to slice with '0' as the high-water mark. self.assertQuerysetEqual(Article.objects.all()[0:0], []) self.assertQuerysetEqual(Article.objects.all()[0:0][:10], []) self.assertEqual(Article.objects.all()[:0].count(), 0) - msg = 'Cannot change a query once a slice has been taken.' + msg = "Cannot change a query once a slice has been taken." with self.assertRaisesMessage(TypeError, msg): - Article.objects.all()[:0].latest('created') + Article.objects.all()[:0].latest("created") def test_empty_resultset_sql(self): # ticket #12192 self.assertNumQueries(0, lambda: list(Number.objects.all()[1:1])) def test_empty_sliced_subquery(self): - self.assertEqual(Eaten.objects.filter(food__in=Food.objects.all()[0:0]).count(), 0) + self.assertEqual( + Eaten.objects.filter(food__in=Food.objects.all()[0:0]).count(), 0 + ) def test_empty_sliced_subquery_exclude(self): - self.assertEqual(Eaten.objects.exclude(food__in=Food.objects.all()[0:0]).count(), 1) + self.assertEqual( + Eaten.objects.exclude(food__in=Food.objects.all()[0:0]).count(), 1 + ) def test_zero_length_values_slicing(self): n = 42 @@ -2506,14 +2830,16 @@ class WeirdQuerysetSlicingTests(TestCase): class EscapingTests(TestCase): def test_ticket_7302(self): # Reserved names are appropriately escaped - r_a = ReservedName.objects.create(name='a', order=42) - r_b = ReservedName.objects.create(name='b', order=37) + r_a = ReservedName.objects.create(name="a", order=42) + r_b = ReservedName.objects.create(name="b", order=37) self.assertSequenceEqual( - ReservedName.objects.all().order_by('order'), + ReservedName.objects.all().order_by("order"), [r_b, r_a], ) self.assertSequenceEqual( - ReservedName.objects.extra(select={'stuff': 'name'}, order_by=('order', 'stuff')), + ReservedName.objects.extra( + select={"stuff": "name"}, order_by=("order", "stuff") + ), [r_b, r_a], ) @@ -2534,16 +2860,20 @@ class ToFieldTests(TestCase): apple = Food.objects.create(name="apple") lunch = Eaten.objects.create(food=apple, meal="lunch") self.assertEqual( - set(Eaten.objects.filter(food__in=Food.objects.filter(name='apple'))), - {lunch} + set(Eaten.objects.filter(food__in=Food.objects.filter(name="apple"))), + {lunch}, ) self.assertEqual( - set(Eaten.objects.filter(food__in=Food.objects.filter(name='apple').values('eaten__meal'))), - set() + set( + Eaten.objects.filter( + food__in=Food.objects.filter(name="apple").values("eaten__meal") + ) + ), + set(), ) self.assertEqual( - set(Food.objects.filter(eaten__in=Eaten.objects.filter(meal='lunch'))), - {apple} + set(Food.objects.filter(eaten__in=Eaten.objects.filter(meal="lunch"))), + {apple}, ) def test_nested_in_subquery(self): @@ -2565,8 +2895,7 @@ class ToFieldTests(TestCase): lunch_pear = Eaten.objects.create(food=pear, meal="dinner") self.assertEqual( - set(Food.objects.filter(eaten__in=[lunch_apple, lunch_pear])), - {apple, pear} + set(Food.objects.filter(eaten__in=[lunch_apple, lunch_pear])), {apple, pear} ) def test_single_object(self): @@ -2574,45 +2903,35 @@ class ToFieldTests(TestCase): lunch = Eaten.objects.create(food=apple, meal="lunch") dinner = Eaten.objects.create(food=apple, meal="dinner") - self.assertEqual( - set(Eaten.objects.filter(food=apple)), - {lunch, dinner} - ) + self.assertEqual(set(Eaten.objects.filter(food=apple)), {lunch, dinner}) def test_single_object_reverse(self): apple = Food.objects.create(name="apple") lunch = Eaten.objects.create(food=apple, meal="lunch") - self.assertEqual( - set(Food.objects.filter(eaten=lunch)), - {apple} - ) + self.assertEqual(set(Food.objects.filter(eaten=lunch)), {apple}) def test_recursive_fk(self): node1 = Node.objects.create(num=42) node2 = Node.objects.create(num=1, parent=node1) - self.assertEqual( - list(Node.objects.filter(parent=node1)), - [node2] - ) + self.assertEqual(list(Node.objects.filter(parent=node1)), [node2]) def test_recursive_fk_reverse(self): node1 = Node.objects.create(num=42) node2 = Node.objects.create(num=1, parent=node1) - self.assertEqual( - list(Node.objects.filter(node=node2)), - [node1] - ) + self.assertEqual(list(Node.objects.filter(node=node2)), [node1]) class IsNullTests(TestCase): def test_primary_key(self): - custom = CustomPk.objects.create(name='pk') + custom = CustomPk.objects.create(name="pk") null = Related.objects.create() notnull = Related.objects.create(custom=custom) - self.assertSequenceEqual(Related.objects.filter(custom__isnull=False), [notnull]) + self.assertSequenceEqual( + Related.objects.filter(custom__isnull=False), [notnull] + ) self.assertSequenceEqual(Related.objects.filter(custom__isnull=True), [null]) def test_to_field(self): @@ -2636,39 +2955,36 @@ class ConditionalTests(TestCase): @classmethod def setUpTestData(cls): generic = NamedCategory.objects.create(name="Generic") - t1 = Tag.objects.create(name='t1', category=generic) - Tag.objects.create(name='t2', parent=t1, category=generic) - t3 = Tag.objects.create(name='t3', parent=t1) - Tag.objects.create(name='t4', parent=t3) - Tag.objects.create(name='t5', parent=t3) + t1 = Tag.objects.create(name="t1", category=generic) + Tag.objects.create(name="t2", parent=t1, category=generic) + t3 = Tag.objects.create(name="t3", parent=t1) + Tag.objects.create(name="t4", parent=t3) + Tag.objects.create(name="t5", parent=t3) def test_infinite_loop(self): # If you're not careful, it's possible to introduce infinite loops via # default ordering on foreign keys in a cycle. We detect that. - with self.assertRaisesMessage(FieldError, 'Infinite loop caused by ordering.'): + with self.assertRaisesMessage(FieldError, "Infinite loop caused by ordering."): list(LoopX.objects.all()) # Force queryset evaluation with list() - with self.assertRaisesMessage(FieldError, 'Infinite loop caused by ordering.'): + with self.assertRaisesMessage(FieldError, "Infinite loop caused by ordering."): list(LoopZ.objects.all()) # Force queryset evaluation with list() # Note that this doesn't cause an infinite loop, since the default # ordering on the Tag model is empty (and thus defaults to using "id" # for the related field). - self.assertEqual(len(Tag.objects.order_by('parent')), 5) + self.assertEqual(len(Tag.objects.order_by("parent")), 5) # ... but you can still order in a non-recursive fashion among linked # fields (the previous test failed because the default ordering was # recursive). - self.assertQuerysetEqual( - LoopX.objects.all().order_by('y__x__y__x__id'), - [] - ) + self.assertQuerysetEqual(LoopX.objects.all().order_by("y__x__y__x__id"), []) # When grouping without specifying ordering, we add an explicit "ORDER BY NULL" # portion in MySQL to prevent unnecessary sorting. - @skipUnlessDBFeature('requires_explicit_null_ordering_when_grouping') + @skipUnlessDBFeature("requires_explicit_null_ordering_when_grouping") def test_null_ordering_added(self): - query = Tag.objects.values_list('parent_id', flat=True).order_by().query - query.group_by = ['parent_id'] + query = Tag.objects.values_list("parent_id", flat=True).order_by().query + query.group_by = ["parent_id"] sql = query.get_compiler(DEFAULT_DB_ALIAS).as_sql()[0] fragment = "ORDER BY " pos = sql.find(fragment) @@ -2685,29 +3001,36 @@ class ConditionalTests(TestCase): Number.objects.bulk_create(Number(num=num) for num in numbers) for number in [1000, 1001, 2000, len(numbers)]: with self.subTest(number=number): - self.assertEqual(Number.objects.filter(num__in=numbers[:number]).count(), number) + self.assertEqual( + Number.objects.filter(num__in=numbers[:number]).count(), number + ) class UnionTests(unittest.TestCase): """ Tests for the union of two querysets. Bug #12252. """ + @classmethod def setUpTestData(cls): objectas = [] objectbs = [] objectcs = [] - a_info = ['one', 'two', 'three'] + a_info = ["one", "two", "three"] for name in a_info: o = ObjectA(name=name) o.save() objectas.append(o) - b_info = [('un', 1, objectas[0]), ('deux', 2, objectas[0]), ('trois', 3, objectas[2])] + b_info = [ + ("un", 1, objectas[0]), + ("deux", 2, objectas[0]), + ("trois", 3, objectas[2]), + ] for name, number, objecta in b_info: o = ObjectB(name=name, num=number, objecta=objecta) o.save() objectbs.append(o) - c_info = [('ein', objectas[2], objectbs[2]), ('zwei', objectas[1], objectbs[1])] + c_info = [("ein", objectas[2], objectbs[2]), ("zwei", objectas[1], objectbs[1])] for name, objecta, objectb in c_info: o = ObjectC(name=name, objecta=objecta, objectb=objectb) o.save() @@ -2719,33 +3042,37 @@ class UnionTests(unittest.TestCase): self.assertEqual(set(filter(Q2) | filter(Q1)), set(filter(Q1 | Q2))) def test_A_AB(self): - Q1 = Q(name='two') - Q2 = Q(objectb__name='deux') + Q1 = Q(name="two") + Q2 = Q(objectb__name="deux") self.check_union(ObjectA, Q1, Q2) def test_A_AB2(self): - Q1 = Q(name='two') - Q2 = Q(objectb__name='deux', objectb__num=2) + Q1 = Q(name="two") + Q2 = Q(objectb__name="deux", objectb__num=2) self.check_union(ObjectA, Q1, Q2) def test_AB_ACB(self): - Q1 = Q(objectb__name='deux') - Q2 = Q(objectc__objectb__name='deux') + Q1 = Q(objectb__name="deux") + Q2 = Q(objectc__objectb__name="deux") self.check_union(ObjectA, Q1, Q2) def test_BAB_BAC(self): - Q1 = Q(objecta__objectb__name='deux') - Q2 = Q(objecta__objectc__name='ein') + Q1 = Q(objecta__objectb__name="deux") + Q2 = Q(objecta__objectc__name="ein") self.check_union(ObjectB, Q1, Q2) def test_BAB_BACB(self): - Q1 = Q(objecta__objectb__name='deux') - Q2 = Q(objecta__objectc__objectb__name='trois') + Q1 = Q(objecta__objectb__name="deux") + Q2 = Q(objecta__objectc__objectb__name="trois") self.check_union(ObjectB, Q1, Q2) def test_BA_BCA__BAB_BAC_BCA(self): - Q1 = Q(objecta__name='one', objectc__objecta__name='two') - Q2 = Q(objecta__objectc__name='ein', objectc__objecta__name='three', objecta__objectb__name='trois') + Q1 = Q(objecta__name="one", objectc__objecta__name="two") + Q2 = Q( + objecta__objectc__name="ein", + objectc__objecta__name="three", + objecta__objectb__name="trois", + ) self.check_union(ObjectB, Q1, Q2) @@ -2760,72 +3087,84 @@ class DefaultValuesInsertTest(TestCase): class ExcludeTests(TestCase): @classmethod def setUpTestData(cls): - f1 = Food.objects.create(name='apples') - cls.f2 = Food.objects.create(name='oranges') - Eaten.objects.create(food=f1, meal='dinner') - cls.j1 = Job.objects.create(name='Manager') - cls.r1 = Responsibility.objects.create(description='Playing golf') - cls.j2 = Job.objects.create(name='Programmer') - cls.r2 = Responsibility.objects.create(description='Programming') + f1 = Food.objects.create(name="apples") + cls.f2 = Food.objects.create(name="oranges") + Eaten.objects.create(food=f1, meal="dinner") + cls.j1 = Job.objects.create(name="Manager") + cls.r1 = Responsibility.objects.create(description="Playing golf") + cls.j2 = Job.objects.create(name="Programmer") + cls.r2 = Responsibility.objects.create(description="Programming") JobResponsibilities.objects.create(job=cls.j1, responsibility=cls.r1) JobResponsibilities.objects.create(job=cls.j2, responsibility=cls.r2) def test_to_field(self): self.assertSequenceEqual( - Food.objects.exclude(eaten__meal='dinner'), + Food.objects.exclude(eaten__meal="dinner"), [self.f2], ) self.assertSequenceEqual( - Job.objects.exclude(responsibilities__description='Playing golf'), + Job.objects.exclude(responsibilities__description="Playing golf"), [self.j2], ) self.assertSequenceEqual( - Responsibility.objects.exclude(jobs__name='Manager'), + Responsibility.objects.exclude(jobs__name="Manager"), [self.r2], ) def test_ticket14511(self): - alex = Person.objects.get_or_create(name='Alex')[0] - jane = Person.objects.get_or_create(name='Jane')[0] + alex = Person.objects.get_or_create(name="Alex")[0] + jane = Person.objects.get_or_create(name="Jane")[0] - oracle = Company.objects.get_or_create(name='Oracle')[0] - google = Company.objects.get_or_create(name='Google')[0] - microsoft = Company.objects.get_or_create(name='Microsoft')[0] - intel = Company.objects.get_or_create(name='Intel')[0] + oracle = Company.objects.get_or_create(name="Oracle")[0] + google = Company.objects.get_or_create(name="Google")[0] + microsoft = Company.objects.get_or_create(name="Microsoft")[0] + intel = Company.objects.get_or_create(name="Intel")[0] def employ(employer, employee, title): - Employment.objects.get_or_create(employee=employee, employer=employer, title=title) + Employment.objects.get_or_create( + employee=employee, employer=employer, title=title + ) - employ(oracle, alex, 'Engineer') - employ(oracle, alex, 'Developer') - employ(google, alex, 'Engineer') - employ(google, alex, 'Manager') - employ(microsoft, alex, 'Manager') - employ(intel, alex, 'Manager') + employ(oracle, alex, "Engineer") + employ(oracle, alex, "Developer") + employ(google, alex, "Engineer") + employ(google, alex, "Manager") + employ(microsoft, alex, "Manager") + employ(intel, alex, "Manager") - employ(microsoft, jane, 'Developer') - employ(intel, jane, 'Manager') + employ(microsoft, jane, "Developer") + employ(intel, jane, "Manager") - alex_tech_employers = alex.employers.filter( - employment__title__in=('Engineer', 'Developer')).distinct().order_by('name') + alex_tech_employers = ( + alex.employers.filter(employment__title__in=("Engineer", "Developer")) + .distinct() + .order_by("name") + ) self.assertSequenceEqual(alex_tech_employers, [google, oracle]) - alex_nontech_employers = alex.employers.exclude( - employment__title__in=('Engineer', 'Developer')).distinct().order_by('name') + alex_nontech_employers = ( + alex.employers.exclude(employment__title__in=("Engineer", "Developer")) + .distinct() + .order_by("name") + ) self.assertSequenceEqual(alex_nontech_employers, [google, intel, microsoft]) def test_exclude_reverse_fk_field_ref(self): tag = Tag.objects.create() - Note.objects.create(tag=tag, note='note') - annotation = Annotation.objects.create(name='annotation', tag=tag) - self.assertEqual(Annotation.objects.exclude(tag__note__note=F('name')).get(), annotation) + Note.objects.create(tag=tag, note="note") + annotation = Annotation.objects.create(name="annotation", tag=tag) + self.assertEqual( + Annotation.objects.exclude(tag__note__note=F("name")).get(), annotation + ) def test_exclude_with_circular_fk_relation(self): - self.assertEqual(ObjectB.objects.exclude(objecta__objectb__name=F('name')).count(), 0) + self.assertEqual( + ObjectB.objects.exclude(objecta__objectb__name=F("name")).count(), 0 + ) def test_subquery_exclude_outerref(self): qs = JobResponsibilities.objects.filter( - Exists(Responsibility.objects.exclude(jobs=OuterRef('job'))), + Exists(Responsibility.objects.exclude(jobs=OuterRef("job"))), ) self.assertTrue(qs.exists()) self.r1.delete() @@ -2835,44 +3174,46 @@ class ExcludeTests(TestCase): number = Number.objects.create(num=1, other_num=1) Number.objects.create(num=2, other_num=2, another_num=2) self.assertSequenceEqual( - Number.objects.exclude(other_num=F('another_num')), + Number.objects.exclude(other_num=F("another_num")), [number], ) self.assertSequenceEqual( - Number.objects.exclude(num=F('another_num')), + Number.objects.exclude(num=F("another_num")), [number], ) def test_exclude_multivalued_exists(self): with CaptureQueriesContext(connection) as captured_queries: self.assertSequenceEqual( - Job.objects.exclude(responsibilities__description='Programming'), + Job.objects.exclude(responsibilities__description="Programming"), [self.j1], ) - self.assertIn('exists', captured_queries[0]['sql'].lower()) + self.assertIn("exists", captured_queries[0]["sql"].lower()) def test_exclude_subquery(self): subquery = JobResponsibilities.objects.filter( - responsibility__description='bar', + responsibility__description="bar", ) | JobResponsibilities.objects.exclude( - job__responsibilities__description='foo', + job__responsibilities__description="foo", ) self.assertCountEqual( Job.objects.annotate( - responsibility=subquery.filter( - job=OuterRef('name'), - ).values('id')[:1] + responsibility=subquery.filter(job=OuterRef("name"),).values( + "id" + )[:1] ), [self.j1, self.j2], ) def test_exclude_unsaved_o2o_object(self): - jack = Staff.objects.create(name='jack') + jack = Staff.objects.create(name="jack") jack_staff = StaffUser.objects.create(staff=jack) - unsaved_object = Staff(name='jane') + unsaved_object = Staff(name="jane") self.assertIsNone(unsaved_object.pk) - self.assertSequenceEqual(StaffUser.objects.exclude(staff=unsaved_object), [jack_staff]) + self.assertSequenceEqual( + StaffUser.objects.exclude(staff=unsaved_object), [jack_staff] + ) class ExcludeTest17600(TestCase): @@ -2880,6 +3221,7 @@ class ExcludeTest17600(TestCase): Some regressiontests for ticket #17600. Some of these likely duplicate other existing tests. """ + @classmethod def setUpTestData(cls): # Create a few Orders. @@ -2948,7 +3290,8 @@ class ExcludeTest17600(TestCase): """ self.assertEqual( list(Order.objects.exclude(items__status=1).distinct()), - list(Order.objects.exclude(Q(items__status=1)).distinct())) + list(Order.objects.exclude(Q(items__status=1)).distinct()), + ) def test_exclude_with_q_is_equal_to_plain_exclude_variation(self): """ @@ -2957,7 +3300,8 @@ class ExcludeTest17600(TestCase): """ self.assertEqual( list(Order.objects.exclude(items__status=1)), - list(Order.objects.exclude(Q(items__status=1)).distinct())) + list(Order.objects.exclude(Q(items__status=1)).distinct()), + ) @unittest.expectedFailure def test_only_orders_with_all_items_having_status_1(self): @@ -2974,40 +3318,50 @@ class ExcludeTest17600(TestCase): class Exclude15786(TestCase): """Regression test for #15786""" + def test_ticket15786(self): - c1 = SimpleCategory.objects.create(name='c1') - c2 = SimpleCategory.objects.create(name='c2') + c1 = SimpleCategory.objects.create(name="c1") + c2 = SimpleCategory.objects.create(name="c2") OneToOneCategory.objects.create(category=c1) OneToOneCategory.objects.create(category=c2) rel = CategoryRelationship.objects.create(first=c1, second=c2) self.assertEqual( CategoryRelationship.objects.exclude( - first__onetoonecategory=F('second__onetoonecategory') - ).get(), rel + first__onetoonecategory=F("second__onetoonecategory") + ).get(), + rel, ) class NullInExcludeTest(TestCase): @classmethod def setUpTestData(cls): - NullableName.objects.create(name='i1') + NullableName.objects.create(name="i1") NullableName.objects.create() def test_null_in_exclude_qs(self): - none_val = '' if connection.features.interprets_empty_strings_as_nulls else None + none_val = "" if connection.features.interprets_empty_strings_as_nulls else None self.assertQuerysetEqual( NullableName.objects.exclude(name__in=[]), - ['i1', none_val], attrgetter('name')) + ["i1", none_val], + attrgetter("name"), + ) self.assertQuerysetEqual( - NullableName.objects.exclude(name__in=['i1']), - [none_val], attrgetter('name')) + NullableName.objects.exclude(name__in=["i1"]), + [none_val], + attrgetter("name"), + ) self.assertQuerysetEqual( - NullableName.objects.exclude(name__in=['i3']), - ['i1', none_val], attrgetter('name')) - inner_qs = NullableName.objects.filter(name='i1').values_list('name') + NullableName.objects.exclude(name__in=["i3"]), + ["i1", none_val], + attrgetter("name"), + ) + inner_qs = NullableName.objects.filter(name="i1").values_list("name") self.assertQuerysetEqual( NullableName.objects.exclude(name__in=inner_qs), - [none_val], attrgetter('name')) + [none_val], + attrgetter("name"), + ) # The inner queryset wasn't executed - it should be turned # into subquery above self.assertIs(inner_qs._result_cache, None) @@ -3020,16 +3374,17 @@ class NullInExcludeTest(TestCase): abstract away. """ self.assertQuerysetEqual( - NullableName.objects.exclude(name__in=[None]), - ['i1'], attrgetter('name')) + NullableName.objects.exclude(name__in=[None]), ["i1"], attrgetter("name") + ) def test_double_exclude(self): self.assertEqual( - list(NullableName.objects.filter(~~Q(name='i1'))), - list(NullableName.objects.filter(Q(name='i1')))) + list(NullableName.objects.filter(~~Q(name="i1"))), + list(NullableName.objects.filter(Q(name="i1"))), + ) self.assertNotIn( - 'IS NOT NULL', - str(NullableName.objects.filter(~~Q(name='i1')).query)) + "IS NOT NULL", str(NullableName.objects.filter(~~Q(name="i1")).query) + ) class EmptyStringsAsNullTest(TestCase): @@ -3038,27 +3393,29 @@ class EmptyStringsAsNullTest(TestCase): The reason for these tests is that Oracle treats '' as NULL, and this can cause problems in query construction. Refs #17957. """ + @classmethod def setUpTestData(cls): - cls.nc = NamedCategory.objects.create(name='') + cls.nc = NamedCategory.objects.create(name="") def test_direct_exclude(self): self.assertQuerysetEqual( - NamedCategory.objects.exclude(name__in=['nonexistent']), - [self.nc.pk], attrgetter('pk') + NamedCategory.objects.exclude(name__in=["nonexistent"]), + [self.nc.pk], + attrgetter("pk"), ) def test_joined_exclude(self): self.assertQuerysetEqual( - DumbCategory.objects.exclude(namedcategory__name__in=['nonexistent']), - [self.nc.pk], attrgetter('pk') + DumbCategory.objects.exclude(namedcategory__name__in=["nonexistent"]), + [self.nc.pk], + attrgetter("pk"), ) def test_21001(self): - foo = NamedCategory.objects.create(name='foo') + foo = NamedCategory.objects.create(name="foo") self.assertQuerysetEqual( - NamedCategory.objects.exclude(name=''), - [foo.pk], attrgetter('pk') + NamedCategory.objects.exclude(name=""), [foo.pk], attrgetter("pk") ) @@ -3078,7 +3435,7 @@ class ProxyQueryCleanupTest(TestCase): class WhereNodeTest(SimpleTestCase): class DummyNode: def as_sql(self, compiler, connection): - return 'dummy', [] + return "dummy", [] class MockCompiler: def compile(self, node): @@ -3093,66 +3450,64 @@ class WhereNodeTest(SimpleTestCase): with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('', [])) + self.assertEqual(w.as_sql(compiler, connection), ("", [])) w = WhereNode(children=[self.DummyNode(), self.DummyNode()]) - self.assertEqual(w.as_sql(compiler, connection), ('(dummy AND dummy)', [])) + self.assertEqual(w.as_sql(compiler, connection), ("(dummy AND dummy)", [])) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('NOT (dummy AND dummy)', [])) + self.assertEqual(w.as_sql(compiler, connection), ("NOT (dummy AND dummy)", [])) w = WhereNode(children=[NothingNode(), self.DummyNode()]) with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('', [])) + self.assertEqual(w.as_sql(compiler, connection), ("", [])) def test_empty_full_handling_disjunction(self): compiler = WhereNodeTest.MockCompiler() - w = WhereNode(children=[NothingNode()], connector='OR') + w = WhereNode(children=[NothingNode()], connector="OR") with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('', [])) - w = WhereNode(children=[self.DummyNode(), self.DummyNode()], connector='OR') - self.assertEqual(w.as_sql(compiler, connection), ('(dummy OR dummy)', [])) + self.assertEqual(w.as_sql(compiler, connection), ("", [])) + w = WhereNode(children=[self.DummyNode(), self.DummyNode()], connector="OR") + self.assertEqual(w.as_sql(compiler, connection), ("(dummy OR dummy)", [])) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('NOT (dummy OR dummy)', [])) - w = WhereNode(children=[NothingNode(), self.DummyNode()], connector='OR') - self.assertEqual(w.as_sql(compiler, connection), ('dummy', [])) + self.assertEqual(w.as_sql(compiler, connection), ("NOT (dummy OR dummy)", [])) + w = WhereNode(children=[NothingNode(), self.DummyNode()], connector="OR") + self.assertEqual(w.as_sql(compiler, connection), ("dummy", [])) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('NOT (dummy)', [])) + self.assertEqual(w.as_sql(compiler, connection), ("NOT (dummy)", [])) def test_empty_nodes(self): compiler = WhereNodeTest.MockCompiler() empty_w = WhereNode() w = WhereNode(children=[empty_w, empty_w]) - self.assertEqual(w.as_sql(compiler, connection), ('', [])) + self.assertEqual(w.as_sql(compiler, connection), ("", [])) w.negate() with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) - w.connector = 'OR' + w.connector = "OR" with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) w.negate() - self.assertEqual(w.as_sql(compiler, connection), ('', [])) - w = WhereNode(children=[empty_w, NothingNode()], connector='OR') - self.assertEqual(w.as_sql(compiler, connection), ('', [])) - w = WhereNode(children=[empty_w, NothingNode()], connector='AND') + self.assertEqual(w.as_sql(compiler, connection), ("", [])) + w = WhereNode(children=[empty_w, NothingNode()], connector="OR") + self.assertEqual(w.as_sql(compiler, connection), ("", [])) + w = WhereNode(children=[empty_w, NothingNode()], connector="AND") with self.assertRaises(EmptyResultSet): w.as_sql(compiler, connection) class QuerySetExceptionTests(SimpleTestCase): def test_iter_exceptions(self): - qs = ExtraInfo.objects.only('author') + qs = ExtraInfo.objects.only("author") msg = "'ManyToOneRel' object has no attribute 'attname'" with self.assertRaisesMessage(AttributeError, msg): list(qs) def test_invalid_order_by(self): - msg = ( - "Cannot resolve keyword '*' into field. Choices are: created, id, name" - ) + msg = "Cannot resolve keyword '*' into field. Choices are: created, id, name" with self.assertRaisesMessage(FieldError, msg): - Article.objects.order_by('*') + Article.objects.order_by("*") def test_invalid_order_by_raw_column_alias(self): msg = ( @@ -3161,7 +3516,7 @@ class QuerySetExceptionTests(SimpleTestCase): "note, note_id, tags" ) with self.assertRaisesMessage(FieldError, msg): - Item.objects.values('creator__name').order_by('queries_author.name') + Item.objects.values("creator__name").order_by("queries_author.name") def test_invalid_queryset_model(self): msg = 'Cannot use QuerySet for "Article": Use a QuerySet for "ExtraInfo".' @@ -3172,12 +3527,12 @@ class QuerySetExceptionTests(SimpleTestCase): class NullJoinPromotionOrTest(TestCase): @classmethod def setUpTestData(cls): - cls.d1 = ModelD.objects.create(name='foo') - d2 = ModelD.objects.create(name='bar') - cls.a1 = ModelA.objects.create(name='a1', d=cls.d1) - c = ModelC.objects.create(name='c') - b = ModelB.objects.create(name='b', c=c) - cls.a2 = ModelA.objects.create(name='a2', b=b, d=d2) + cls.d1 = ModelD.objects.create(name="foo") + d2 = ModelD.objects.create(name="bar") + cls.a1 = ModelA.objects.create(name="a1", d=cls.d1) + c = ModelC.objects.create(name="c") + b = ModelB.objects.create(name="b", c=c) + cls.a2 = ModelA.objects.create(name="a2", b=b, d=d2) def test_ticket_17886(self): # The first Q-object is generating the match, the rest of the filters @@ -3185,101 +3540,108 @@ class NullJoinPromotionOrTest(TestCase): # problem here was that b__name generates a LOUTER JOIN, then # b__c__name generates join to c, which the ORM tried to promote but # failed as that join isn't nullable. - q_obj = ( - Q(d__name='foo') | - Q(b__name='foo') | - Q(b__c__name='foo') - ) + q_obj = Q(d__name="foo") | Q(b__name="foo") | Q(b__c__name="foo") qset = ModelA.objects.filter(q_obj) self.assertEqual(list(qset), [self.a1]) # We generate one INNER JOIN to D. The join is direct and not nullable # so we can use INNER JOIN for it. However, we can NOT use INNER JOIN # for the b->c join, as a->b is nullable. - self.assertEqual(str(qset.query).count('INNER JOIN'), 1) + self.assertEqual(str(qset.query).count("INNER JOIN"), 1) def test_isnull_filter_promotion(self): qs = ModelA.objects.filter(Q(b__name__isnull=True)) - self.assertEqual(str(qs.query).count('LEFT OUTER'), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER"), 1) self.assertEqual(list(qs), [self.a1]) qs = ModelA.objects.filter(~Q(b__name__isnull=True)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) self.assertEqual(list(qs), [self.a2]) qs = ModelA.objects.filter(~~Q(b__name__isnull=True)) - self.assertEqual(str(qs.query).count('LEFT OUTER'), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER"), 1) self.assertEqual(list(qs), [self.a1]) qs = ModelA.objects.filter(Q(b__name__isnull=False)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) self.assertEqual(list(qs), [self.a2]) qs = ModelA.objects.filter(~Q(b__name__isnull=False)) - self.assertEqual(str(qs.query).count('LEFT OUTER'), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER"), 1) self.assertEqual(list(qs), [self.a1]) qs = ModelA.objects.filter(~~Q(b__name__isnull=False)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) self.assertEqual(list(qs), [self.a2]) def test_null_join_demotion(self): qs = ModelA.objects.filter(Q(b__name__isnull=False) & Q(b__name__isnull=True)) - self.assertIn(' INNER JOIN ', str(qs.query)) + self.assertIn(" INNER JOIN ", str(qs.query)) qs = ModelA.objects.filter(Q(b__name__isnull=True) & Q(b__name__isnull=False)) - self.assertIn(' INNER JOIN ', str(qs.query)) + self.assertIn(" INNER JOIN ", str(qs.query)) qs = ModelA.objects.filter(Q(b__name__isnull=False) | Q(b__name__isnull=True)) - self.assertIn(' LEFT OUTER JOIN ', str(qs.query)) + self.assertIn(" LEFT OUTER JOIN ", str(qs.query)) qs = ModelA.objects.filter(Q(b__name__isnull=True) | Q(b__name__isnull=False)) - self.assertIn(' LEFT OUTER JOIN ', str(qs.query)) + self.assertIn(" LEFT OUTER JOIN ", str(qs.query)) def test_ticket_21366(self): - n = Note.objects.create(note='n', misc='m') - e = ExtraInfo.objects.create(info='info', note=n) - a = Author.objects.create(name='Author1', num=1, extra=e) + n = Note.objects.create(note="n", misc="m") + e = ExtraInfo.objects.create(info="info", note=n) + a = Author.objects.create(name="Author1", num=1, extra=e) Ranking.objects.create(rank=1, author=a) - r1 = Report.objects.create(name='Foo', creator=a) - r2 = Report.objects.create(name='Bar') - Report.objects.create(name='Bar', creator=a) + r1 = Report.objects.create(name="Foo", creator=a) + r2 = Report.objects.create(name="Bar") + Report.objects.create(name="Bar", creator=a) qs = Report.objects.filter( - Q(creator__ranking__isnull=True) | - Q(creator__ranking__rank=1, name='Foo') + Q(creator__ranking__isnull=True) | Q(creator__ranking__rank=1, name="Foo") ) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) - self.assertEqual(str(qs.query).count(' JOIN '), 2) - self.assertSequenceEqual(qs.order_by('name'), [r2, r1]) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) + self.assertEqual(str(qs.query).count(" JOIN "), 2) + self.assertSequenceEqual(qs.order_by("name"), [r2, r1]) def test_ticket_21748(self): - i1 = Identifier.objects.create(name='i1') - i2 = Identifier.objects.create(name='i2') - i3 = Identifier.objects.create(name='i3') + i1 = Identifier.objects.create(name="i1") + i2 = Identifier.objects.create(name="i2") + i3 = Identifier.objects.create(name="i3") Program.objects.create(identifier=i1) Channel.objects.create(identifier=i1) Program.objects.create(identifier=i2) - self.assertSequenceEqual(Identifier.objects.filter(program=None, channel=None), [i3]) - self.assertSequenceEqual(Identifier.objects.exclude(program=None, channel=None).order_by('name'), [i1, i2]) + self.assertSequenceEqual( + Identifier.objects.filter(program=None, channel=None), [i3] + ) + self.assertSequenceEqual( + Identifier.objects.exclude(program=None, channel=None).order_by("name"), + [i1, i2], + ) def test_ticket_21748_double_negated_and(self): - i1 = Identifier.objects.create(name='i1') - i2 = Identifier.objects.create(name='i2') - Identifier.objects.create(name='i3') + i1 = Identifier.objects.create(name="i1") + i2 = Identifier.objects.create(name="i2") + Identifier.objects.create(name="i3") p1 = Program.objects.create(identifier=i1) c1 = Channel.objects.create(identifier=i1) Program.objects.create(identifier=i2) # Check the ~~Q() (or equivalently .exclude(~Q)) works like Q() for # join promotion. - qs1_doubleneg = Identifier.objects.exclude(~Q(program__id=p1.id, channel__id=c1.id)).order_by('pk') - qs1_filter = Identifier.objects.filter(program__id=p1.id, channel__id=c1.id).order_by('pk') + qs1_doubleneg = Identifier.objects.exclude( + ~Q(program__id=p1.id, channel__id=c1.id) + ).order_by("pk") + qs1_filter = Identifier.objects.filter( + program__id=p1.id, channel__id=c1.id + ).order_by("pk") self.assertQuerysetEqual(qs1_doubleneg, qs1_filter, lambda x: x) - self.assertEqual(str(qs1_filter.query).count('JOIN'), - str(qs1_doubleneg.query).count('JOIN')) - self.assertEqual(2, str(qs1_doubleneg.query).count('INNER JOIN')) - self.assertEqual(str(qs1_filter.query).count('INNER JOIN'), - str(qs1_doubleneg.query).count('INNER JOIN')) + self.assertEqual( + str(qs1_filter.query).count("JOIN"), str(qs1_doubleneg.query).count("JOIN") + ) + self.assertEqual(2, str(qs1_doubleneg.query).count("INNER JOIN")) + self.assertEqual( + str(qs1_filter.query).count("INNER JOIN"), + str(qs1_doubleneg.query).count("INNER JOIN"), + ) def test_ticket_21748_double_negated_or(self): - i1 = Identifier.objects.create(name='i1') - i2 = Identifier.objects.create(name='i2') - Identifier.objects.create(name='i3') + i1 = Identifier.objects.create(name="i1") + i2 = Identifier.objects.create(name="i2") + Identifier.objects.create(name="i3") p1 = Program.objects.create(identifier=i1) c1 = Channel.objects.create(identifier=i1) p2 = Program.objects.create(identifier=i2) @@ -3287,21 +3649,24 @@ class NullJoinPromotionOrTest(TestCase): # joined, program INNER joined qs1_filter = Identifier.objects.filter( Q(program__id=p2.id, channel__id=c1.id) | Q(program__id=p1.id) - ).order_by('pk') + ).order_by("pk") qs1_doubleneg = Identifier.objects.exclude( ~Q(Q(program__id=p2.id, channel__id=c1.id) | Q(program__id=p1.id)) - ).order_by('pk') + ).order_by("pk") self.assertQuerysetEqual(qs1_doubleneg, qs1_filter, lambda x: x) - self.assertEqual(str(qs1_filter.query).count('JOIN'), - str(qs1_doubleneg.query).count('JOIN')) - self.assertEqual(1, str(qs1_doubleneg.query).count('INNER JOIN')) - self.assertEqual(str(qs1_filter.query).count('INNER JOIN'), - str(qs1_doubleneg.query).count('INNER JOIN')) + self.assertEqual( + str(qs1_filter.query).count("JOIN"), str(qs1_doubleneg.query).count("JOIN") + ) + self.assertEqual(1, str(qs1_doubleneg.query).count("INNER JOIN")) + self.assertEqual( + str(qs1_filter.query).count("INNER JOIN"), + str(qs1_doubleneg.query).count("INNER JOIN"), + ) def test_ticket_21748_complex_filter(self): - i1 = Identifier.objects.create(name='i1') - i2 = Identifier.objects.create(name='i2') - Identifier.objects.create(name='i3') + i1 = Identifier.objects.create(name="i1") + i2 = Identifier.objects.create(name="i2") + Identifier.objects.create(name="i3") p1 = Program.objects.create(identifier=i1) c1 = Channel.objects.create(identifier=i1) p2 = Program.objects.create(identifier=i2) @@ -3310,16 +3675,16 @@ class NullJoinPromotionOrTest(TestCase): # another query where this isn't done. qs1 = Identifier.objects.filter( ~Q(~Q(program__id=p2.id, channel__id=c1.id) & Q(program__id=p1.id)) - ).order_by('pk') + ).order_by("pk") qs2 = Identifier.objects.filter( Q(Q(program__id=p2.id, channel__id=c1.id) | ~Q(program__id=p1.id)) - ).order_by('pk') + ).order_by("pk") self.assertQuerysetEqual(qs1, qs2, lambda x: x) - self.assertEqual(str(qs1.query).count('JOIN'), - str(qs2.query).count('JOIN')) - self.assertEqual(0, str(qs1.query).count('INNER JOIN')) - self.assertEqual(str(qs1.query).count('INNER JOIN'), - str(qs2.query).count('INNER JOIN')) + self.assertEqual(str(qs1.query).count("JOIN"), str(qs2.query).count("JOIN")) + self.assertEqual(0, str(qs1.query).count("INNER JOIN")) + self.assertEqual( + str(qs1.query).count("INNER JOIN"), str(qs2.query).count("INNER JOIN") + ) class ReverseJoinTrimmingTest(TestCase): @@ -3329,7 +3694,7 @@ class ReverseJoinTrimmingTest(TestCase): # can't be done, ever. t = Tag.objects.create() qs = Tag.objects.filter(annotation__tag=t.pk) - self.assertIn('INNER JOIN', str(qs.query)) + self.assertIn("INNER JOIN", str(qs.query)) self.assertEqual(list(qs), []) @@ -3338,33 +3703,34 @@ class JoinReuseTest(TestCase): The queries reuse joins sensibly (for example, direct joins are always reused). """ + def test_fk_reuse(self): - qs = Annotation.objects.filter(tag__name='foo').filter(tag__name='bar') - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Annotation.objects.filter(tag__name="foo").filter(tag__name="bar") + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_fk_reuse_select_related(self): - qs = Annotation.objects.filter(tag__name='foo').select_related('tag') - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Annotation.objects.filter(tag__name="foo").select_related("tag") + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_fk_reuse_annotation(self): - qs = Annotation.objects.filter(tag__name='foo').annotate(cnt=Count('tag__name')) - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Annotation.objects.filter(tag__name="foo").annotate(cnt=Count("tag__name")) + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_fk_reuse_disjunction(self): - qs = Annotation.objects.filter(Q(tag__name='foo') | Q(tag__name='bar')) - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Annotation.objects.filter(Q(tag__name="foo") | Q(tag__name="bar")) + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_fk_reuse_order_by(self): - qs = Annotation.objects.filter(tag__name='foo').order_by('tag__name') - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Annotation.objects.filter(tag__name="foo").order_by("tag__name") + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_revo2o_reuse(self): - qs = Detail.objects.filter(member__name='foo').filter(member__name='foo') - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Detail.objects.filter(member__name="foo").filter(member__name="foo") + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_revfk_noreuse(self): - qs = Author.objects.filter(report__name='r4').filter(report__name='r1') - self.assertEqual(str(qs.query).count('JOIN'), 2) + qs = Author.objects.filter(report__name="r4").filter(report__name="r1") + self.assertEqual(str(qs.query).count("JOIN"), 2) def test_inverted_q_across_relations(self): """ @@ -3378,23 +3744,26 @@ class JoinReuseTest(TestCase): hogward = School.objects.create() Student.objects.create(school=springfield_elementary) hp = Student.objects.create(school=hogward) - Classroom.objects.create(school=hogward, name='Potion') - Classroom.objects.create(school=springfield_elementary, name='Main') + Classroom.objects.create(school=hogward, name="Potion") + Classroom.objects.create(school=springfield_elementary, name="Main") qs = Student.objects.filter( - ~(Q(school__classroom__name='Main') & Q(school__classroom__has_blackboard=None)) + ~( + Q(school__classroom__name="Main") + & Q(school__classroom__has_blackboard=None) + ) ) self.assertSequenceEqual(qs, [hp]) class DisjunctionPromotionTests(TestCase): def test_disjunction_promotion_select_related(self): - fk1 = FK1.objects.create(f1='f1', f2='f2') + fk1 = FK1.objects.create(f1="f1", f2="f2") basea = BaseA.objects.create(a=fk1) qs = BaseA.objects.filter(Q(a=fk1) | Q(b=2)) - self.assertEqual(str(qs.query).count(' JOIN '), 0) - qs = qs.select_related('a', 'b') - self.assertEqual(str(qs.query).count(' INNER JOIN '), 0) - self.assertEqual(str(qs.query).count(' LEFT OUTER JOIN '), 2) + self.assertEqual(str(qs.query).count(" JOIN "), 0) + qs = qs.select_related("a", "b") + self.assertEqual(str(qs.query).count(" INNER JOIN "), 0) + self.assertEqual(str(qs.query).count(" LEFT OUTER JOIN "), 2) with self.assertNumQueries(1): self.assertSequenceEqual(qs, [basea]) self.assertEqual(qs[0].a, fk1) @@ -3403,161 +3772,162 @@ class DisjunctionPromotionTests(TestCase): def test_disjunction_promotion1(self): # Pre-existing join, add two ORed filters to the same join, # all joins can be INNER JOINS. - qs = BaseA.objects.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - qs = qs.filter(Q(b__f1='foo') | Q(b__f2='foo')) - self.assertEqual(str(qs.query).count('INNER JOIN'), 2) + qs = BaseA.objects.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + qs = qs.filter(Q(b__f1="foo") | Q(b__f2="foo")) + self.assertEqual(str(qs.query).count("INNER JOIN"), 2) # Reverse the order of AND and OR filters. - qs = BaseA.objects.filter(Q(b__f1='foo') | Q(b__f2='foo')) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - qs = qs.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 2) + qs = BaseA.objects.filter(Q(b__f1="foo") | Q(b__f2="foo")) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + qs = qs.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 2) def test_disjunction_promotion2(self): - qs = BaseA.objects.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + qs = BaseA.objects.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) # Now we have two different joins in an ORed condition, these # must be OUTER joins. The pre-existing join should remain INNER. - qs = qs.filter(Q(b__f1='foo') | Q(c__f2='foo')) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) + qs = qs.filter(Q(b__f1="foo") | Q(c__f2="foo")) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) # Reverse case. - qs = BaseA.objects.filter(Q(b__f1='foo') | Q(c__f2='foo')) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) - qs = qs.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) + qs = BaseA.objects.filter(Q(b__f1="foo") | Q(c__f2="foo")) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) + qs = qs.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) def test_disjunction_promotion3(self): - qs = BaseA.objects.filter(a__f2='bar') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + qs = BaseA.objects.filter(a__f2="bar") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) # The ANDed a__f2 filter allows us to use keep using INNER JOIN # even inside the ORed case. If the join to a__ returns nothing, # the ANDed filter for a__f2 can't be true. - qs = qs.filter(Q(a__f1='foo') | Q(b__f2='foo')) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) + qs = qs.filter(Q(a__f1="foo") | Q(b__f2="foo")) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) def test_disjunction_promotion3_demote(self): # This one needs demotion logic: the first filter causes a to be # outer joined, the second filter makes it inner join again. - qs = BaseA.objects.filter( - Q(a__f1='foo') | Q(b__f2='foo')).filter(a__f2='bar') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) + qs = BaseA.objects.filter(Q(a__f1="foo") | Q(b__f2="foo")).filter(a__f2="bar") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) def test_disjunction_promotion4_demote(self): qs = BaseA.objects.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('JOIN'), 0) + self.assertEqual(str(qs.query).count("JOIN"), 0) # Demote needed for the "a" join. It is marked as outer join by # above filter (even if it is trimmed away). - qs = qs.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + qs = qs.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) def test_disjunction_promotion4(self): - qs = BaseA.objects.filter(a__f1='foo') - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + qs = BaseA.objects.filter(a__f1="foo") + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) qs = qs.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) def test_disjunction_promotion5_demote(self): qs = BaseA.objects.filter(Q(a=1) | Q(a=2)) # Note that the above filters on a force the join to an # inner join even if it is trimmed. - self.assertEqual(str(qs.query).count('JOIN'), 0) - qs = qs.filter(Q(a__f1='foo') | Q(b__f1='foo')) + self.assertEqual(str(qs.query).count("JOIN"), 0) + qs = qs.filter(Q(a__f1="foo") | Q(b__f1="foo")) # So, now the a__f1 join doesn't need promotion. - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) # But b__f1 does. - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) - qs = BaseA.objects.filter(Q(a__f1='foo') | Q(b__f1='foo')) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) + qs = BaseA.objects.filter(Q(a__f1="foo") | Q(b__f1="foo")) # Now the join to a is created as LOUTER - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) qs = qs.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) def test_disjunction_promotion6(self): qs = BaseA.objects.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('JOIN'), 0) - qs = BaseA.objects.filter(Q(a__f1='foo') & Q(b__f1='foo')) - self.assertEqual(str(qs.query).count('INNER JOIN'), 2) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 0) - - qs = BaseA.objects.filter(Q(a__f1='foo') & Q(b__f1='foo')) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 0) - self.assertEqual(str(qs.query).count('INNER JOIN'), 2) + self.assertEqual(str(qs.query).count("JOIN"), 0) + qs = BaseA.objects.filter(Q(a__f1="foo") & Q(b__f1="foo")) + self.assertEqual(str(qs.query).count("INNER JOIN"), 2) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 0) + + qs = BaseA.objects.filter(Q(a__f1="foo") & Q(b__f1="foo")) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 0) + self.assertEqual(str(qs.query).count("INNER JOIN"), 2) qs = qs.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('INNER JOIN'), 2) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 0) + self.assertEqual(str(qs.query).count("INNER JOIN"), 2) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 0) def test_disjunction_promotion7(self): qs = BaseA.objects.filter(Q(a=1) | Q(a=2)) - self.assertEqual(str(qs.query).count('JOIN'), 0) - qs = BaseA.objects.filter(Q(a__f1='foo') | (Q(b__f1='foo') & Q(a__f1='bar'))) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) + self.assertEqual(str(qs.query).count("JOIN"), 0) + qs = BaseA.objects.filter(Q(a__f1="foo") | (Q(b__f1="foo") & Q(a__f1="bar"))) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) qs = BaseA.objects.filter( - (Q(a__f1='foo') | Q(b__f1='foo')) & (Q(a__f1='bar') | Q(c__f1='foo')) + (Q(a__f1="foo") | Q(b__f1="foo")) & (Q(a__f1="bar") | Q(c__f1="foo")) ) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 3) - self.assertEqual(str(qs.query).count('INNER JOIN'), 0) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 3) + self.assertEqual(str(qs.query).count("INNER JOIN"), 0) qs = BaseA.objects.filter( - Q(a__f1='foo') | Q(a__f1='bar') & (Q(b__f1='bar') | Q(c__f1='foo')) + Q(a__f1="foo") | Q(a__f1="bar") & (Q(b__f1="bar") | Q(c__f1="foo")) ) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) def test_disjunction_promotion_fexpression(self): - qs = BaseA.objects.filter(Q(a__f1=F('b__f1')) | Q(b__f1='foo')) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 1) - self.assertEqual(str(qs.query).count('INNER JOIN'), 1) - qs = BaseA.objects.filter(Q(a__f1=F('c__f1')) | Q(b__f1='foo')) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 3) - qs = BaseA.objects.filter(Q(a__f1=F('b__f1')) | Q(a__f2=F('b__f2')) | Q(c__f1='foo')) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 3) - qs = BaseA.objects.filter(Q(a__f1=F('c__f1')) | (Q(pk=1) & Q(pk=2))) - self.assertEqual(str(qs.query).count('LEFT OUTER JOIN'), 2) - self.assertEqual(str(qs.query).count('INNER JOIN'), 0) + qs = BaseA.objects.filter(Q(a__f1=F("b__f1")) | Q(b__f1="foo")) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 1) + self.assertEqual(str(qs.query).count("INNER JOIN"), 1) + qs = BaseA.objects.filter(Q(a__f1=F("c__f1")) | Q(b__f1="foo")) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 3) + qs = BaseA.objects.filter( + Q(a__f1=F("b__f1")) | Q(a__f2=F("b__f2")) | Q(c__f1="foo") + ) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 3) + qs = BaseA.objects.filter(Q(a__f1=F("c__f1")) | (Q(pk=1) & Q(pk=2))) + self.assertEqual(str(qs.query).count("LEFT OUTER JOIN"), 2) + self.assertEqual(str(qs.query).count("INNER JOIN"), 0) class ManyToManyExcludeTest(TestCase): def test_exclude_many_to_many(self): - i_extra = Identifier.objects.create(name='extra') - i_program = Identifier.objects.create(name='program') + i_extra = Identifier.objects.create(name="extra") + i_program = Identifier.objects.create(name="program") program = Program.objects.create(identifier=i_program) - i_channel = Identifier.objects.create(name='channel') + i_channel = Identifier.objects.create(name="channel") channel = Channel.objects.create(identifier=i_channel) channel.programs.add(program) # channel contains 'program1', so all Identifiers except that one # should be returned self.assertSequenceEqual( - Identifier.objects.exclude(program__channel=channel).order_by('name'), + Identifier.objects.exclude(program__channel=channel).order_by("name"), [i_channel, i_extra], ) self.assertSequenceEqual( - Identifier.objects.exclude(program__channel=None).order_by('name'), + Identifier.objects.exclude(program__channel=None).order_by("name"), [i_program], ) def test_ticket_12823(self): - pg3 = Page.objects.create(text='pg3') - pg2 = Page.objects.create(text='pg2') - pg1 = Page.objects.create(text='pg1') - pa1 = Paragraph.objects.create(text='pa1') + pg3 = Page.objects.create(text="pg3") + pg2 = Page.objects.create(text="pg2") + pg1 = Page.objects.create(text="pg1") + pa1 = Paragraph.objects.create(text="pa1") pa1.page.set([pg1, pg2]) - pa2 = Paragraph.objects.create(text='pa2') + pa2 = Paragraph.objects.create(text="pa2") pa2.page.set([pg2, pg3]) - pa3 = Paragraph.objects.create(text='pa3') - ch1 = Chapter.objects.create(title='ch1', paragraph=pa1) - ch2 = Chapter.objects.create(title='ch2', paragraph=pa2) - ch3 = Chapter.objects.create(title='ch3', paragraph=pa3) - b1 = Book.objects.create(title='b1', chapter=ch1) - b2 = Book.objects.create(title='b2', chapter=ch2) - b3 = Book.objects.create(title='b3', chapter=ch3) - q = Book.objects.exclude(chapter__paragraph__page__text='pg1') - self.assertNotIn('IS NOT NULL', str(q.query)) + pa3 = Paragraph.objects.create(text="pa3") + ch1 = Chapter.objects.create(title="ch1", paragraph=pa1) + ch2 = Chapter.objects.create(title="ch2", paragraph=pa2) + ch3 = Chapter.objects.create(title="ch3", paragraph=pa3) + b1 = Book.objects.create(title="b1", chapter=ch1) + b2 = Book.objects.create(title="b2", chapter=ch2) + b3 = Book.objects.create(title="b3", chapter=ch3) + q = Book.objects.exclude(chapter__paragraph__page__text="pg1") + self.assertNotIn("IS NOT NULL", str(q.query)) self.assertEqual(len(q), 2) self.assertNotIn(b1, q) self.assertIn(b2, q) @@ -3566,12 +3936,12 @@ class ManyToManyExcludeTest(TestCase): class RelabelCloneTest(TestCase): def test_ticket_19964(self): - my1 = MyObject.objects.create(data='foo') + my1 = MyObject.objects.create(data="foo") my1.parent = my1 my1.save() - my2 = MyObject.objects.create(data='bar', parent=my1) - parents = MyObject.objects.filter(parent=F('id')) - children = MyObject.objects.filter(parent__in=parents).exclude(parent=F('id')) + my2 = MyObject.objects.create(data="bar", parent=my1) + parents = MyObject.objects.filter(parent=F("id")) + children = MyObject.objects.filter(parent__in=parents).exclude(parent=F("id")) self.assertEqual(list(parents), [my1]) # Evaluating the children query (which has parents as part of it) does # not change results for the parents query. @@ -3584,11 +3954,11 @@ class Ticket20101Tests(TestCase): """ Tests QuerySet ORed combining in exclude subquery case. """ - t = Tag.objects.create(name='foo') - a1 = Annotation.objects.create(tag=t, name='a1') - a2 = Annotation.objects.create(tag=t, name='a2') - a3 = Annotation.objects.create(tag=t, name='a3') - n = Note.objects.create(note='foo', misc='bar') + t = Tag.objects.create(name="foo") + a1 = Annotation.objects.create(tag=t, name="a1") + a2 = Annotation.objects.create(tag=t, name="a2") + a3 = Annotation.objects.create(tag=t, name="a3") + n = Note.objects.create(note="foo", misc="bar") qs1 = Note.objects.exclude(annotation__in=[a1, a2]) qs2 = Note.objects.filter(annotation__in=[a3]) self.assertIn(n, qs1) @@ -3598,11 +3968,11 @@ class Ticket20101Tests(TestCase): class EmptyStringPromotionTests(SimpleTestCase): def test_empty_string_promotion(self): - qs = RelatedObject.objects.filter(single__name='') + qs = RelatedObject.objects.filter(single__name="") if connection.features.interprets_empty_strings_as_nulls: - self.assertIn('LEFT OUTER JOIN', str(qs.query)) + self.assertIn("LEFT OUTER JOIN", str(qs.query)) else: - self.assertNotIn('LEFT OUTER JOIN', str(qs.query)) + self.assertNotIn("LEFT OUTER JOIN", str(qs.query)) class ValuesSubqueryTests(TestCase): @@ -3618,19 +3988,22 @@ class ValuesSubqueryTests(TestCase): # The query below should match o1 as it has related order_item # with id == status. - self.assertSequenceEqual(Order.objects.filter(items__in=OrderItem.objects.values_list('status')), [o1]) + self.assertSequenceEqual( + Order.objects.filter(items__in=OrderItem.objects.values_list("status")), + [o1], + ) class DoubleInSubqueryTests(TestCase): def test_double_subquery_in(self): - lfa1 = LeafA.objects.create(data='foo') - lfa2 = LeafA.objects.create(data='bar') - lfb1 = LeafB.objects.create(data='lfb1') - lfb2 = LeafB.objects.create(data='lfb2') + lfa1 = LeafA.objects.create(data="foo") + lfa2 = LeafA.objects.create(data="bar") + lfb1 = LeafB.objects.create(data="lfb1") + lfb2 = LeafB.objects.create(data="lfb2") Join.objects.create(a=lfa1, b=lfb1) Join.objects.create(a=lfa2, b=lfb2) - leaf_as = LeafA.objects.filter(data='foo').values_list('pk', flat=True) - joins = Join.objects.filter(a__in=leaf_as).values_list('b__id', flat=True) + leaf_as = LeafA.objects.filter(data="foo").values_list("pk", flat=True) + joins = Join.objects.filter(a__in=leaf_as).values_list("b__id", flat=True) qs = LeafB.objects.filter(pk__in=joins) self.assertSequenceEqual(qs, [lfb1]) @@ -3638,13 +4011,13 @@ class DoubleInSubqueryTests(TestCase): class Ticket18785Tests(SimpleTestCase): def test_ticket_18785(self): # Test join trimming from ticket18785 - qs = Item.objects.exclude( - note__isnull=False - ).filter( - name='something', creator__extra__isnull=True - ).order_by() - self.assertEqual(1, str(qs.query).count('INNER JOIN')) - self.assertEqual(0, str(qs.query).count('OUTER JOIN')) + qs = ( + Item.objects.exclude(note__isnull=False) + .filter(name="something", creator__extra__isnull=True) + .order_by() + ) + self.assertEqual(1, str(qs.query).count("INNER JOIN")) + self.assertEqual(0, str(qs.query).count("OUTER JOIN")) class Ticket20788Tests(TestCase): @@ -3694,23 +4067,35 @@ class RelatedLookupTypeTests(TestCase): query lookup. """ # Passing incorrect object type - with self.assertRaisesMessage(ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name) + ): ObjectB.objects.get(objecta=self.wrong_type) - with self.assertRaisesMessage(ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name) + ): ObjectB.objects.filter(objecta__in=[self.wrong_type]) - with self.assertRaisesMessage(ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.wrong_type, ObjectA._meta.object_name) + ): ObjectB.objects.filter(objecta=self.wrong_type) - with self.assertRaisesMessage(ValueError, self.error % (self.wrong_type, ObjectB._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.wrong_type, ObjectB._meta.object_name) + ): ObjectA.objects.filter(objectb__in=[self.wrong_type, self.ob]) # Passing an object of the class on which query is done. - with self.assertRaisesMessage(ValueError, self.error % (self.ob, ObjectA._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.ob, ObjectA._meta.object_name) + ): ObjectB.objects.filter(objecta__in=[self.poa, self.ob]) - with self.assertRaisesMessage(ValueError, self.error % (self.ob, ChildObjectA._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.ob, ChildObjectA._meta.object_name) + ): ObjectC.objects.exclude(childobjecta__in=[self.coa, self.ob]) def test_wrong_backward_lookup(self): @@ -3718,13 +4103,19 @@ class RelatedLookupTypeTests(TestCase): A ValueError is raised when the incorrect object type is passed to a query lookup for backward relations. """ - with self.assertRaisesMessage(ValueError, self.error % (self.oa, ObjectB._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.oa, ObjectB._meta.object_name) + ): ObjectA.objects.filter(objectb__in=[self.oa, self.ob]) - with self.assertRaisesMessage(ValueError, self.error % (self.oa, ObjectB._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.oa, ObjectB._meta.object_name) + ): ObjectA.objects.exclude(objectb=self.oa) - with self.assertRaisesMessage(ValueError, self.error % (self.wrong_type, ObjectB._meta.object_name)): + with self.assertRaisesMessage( + ValueError, self.error % (self.wrong_type, ObjectB._meta.object_name) + ): ObjectA.objects.get(objectb=self.wrong_type) def test_correct_lookup(self): @@ -3737,15 +4128,24 @@ class RelatedLookupTypeTests(TestCase): out_c = [self.c] # proxy model objects - self.assertSequenceEqual(ObjectB.objects.filter(objecta=self.poa).order_by('name'), out_b) - self.assertSequenceEqual(ObjectA.objects.filter(objectb__in=self.pob).order_by('pk'), out_a * 2) + self.assertSequenceEqual( + ObjectB.objects.filter(objecta=self.poa).order_by("name"), out_b + ) + self.assertSequenceEqual( + ObjectA.objects.filter(objectb__in=self.pob).order_by("pk"), out_a * 2 + ) # child objects self.assertSequenceEqual(ObjectB.objects.filter(objecta__in=[self.coa]), []) - self.assertSequenceEqual(ObjectB.objects.filter(objecta__in=[self.poa, self.coa]).order_by('name'), out_b) self.assertSequenceEqual( - ObjectB.objects.filter(objecta__in=iter([self.poa, self.coa])).order_by('name'), - out_b + ObjectB.objects.filter(objecta__in=[self.poa, self.coa]).order_by("name"), + out_b, + ) + self.assertSequenceEqual( + ObjectB.objects.filter(objecta__in=iter([self.poa, self.coa])).order_by( + "name" + ), + out_b, ) # parent objects @@ -3761,35 +4161,40 @@ class RelatedLookupTypeTests(TestCase): #23396 - Ensure ValueQuerySets are not checked for compatibility with the lookup field """ # Make sure the num and objecta field values match. - ob = ObjectB.objects.get(name='ob') + ob = ObjectB.objects.get(name="ob") ob.num = ob.objecta.pk ob.save() - pob = ObjectB.objects.get(name='pob') + pob = ObjectB.objects.get(name="pob") pob.num = pob.objecta.pk pob.save() - self.assertSequenceEqual(ObjectB.objects.filter( - objecta__in=ObjectB.objects.all().values_list('num') - ).order_by('pk'), [ob, pob]) + self.assertSequenceEqual( + ObjectB.objects.filter( + objecta__in=ObjectB.objects.all().values_list("num") + ).order_by("pk"), + [ob, pob], + ) class Ticket14056Tests(TestCase): def test_ticket_14056(self): - s1 = SharedConnection.objects.create(data='s1') - s2 = SharedConnection.objects.create(data='s2') - s3 = SharedConnection.objects.create(data='s3') + s1 = SharedConnection.objects.create(data="s1") + s2 = SharedConnection.objects.create(data="s2") + s3 = SharedConnection.objects.create(data="s3") PointerA.objects.create(connection=s2) expected_ordering = ( - [s1, s3, s2] if connection.features.nulls_order_largest - else [s2, s1, s3] + [s1, s3, s2] if connection.features.nulls_order_largest else [s2, s1, s3] + ) + self.assertSequenceEqual( + SharedConnection.objects.order_by("-pointera__connection", "pk"), + expected_ordering, ) - self.assertSequenceEqual(SharedConnection.objects.order_by('-pointera__connection', 'pk'), expected_ordering) class Ticket20955Tests(TestCase): def test_ticket_20955(self): - jack = Staff.objects.create(name='jackstaff') + jack = Staff.objects.create(name="jackstaff") jackstaff = StaffUser.objects.create(staff=jack) - jill = Staff.objects.create(name='jillstaff') + jill = Staff.objects.create(name="jillstaff") jillstaff = StaffUser.objects.create(staff=jill) task = Task.objects.create(creator=jackstaff, owner=jillstaff, title="task") task_get = Task.objects.get(pk=task.pk) @@ -3798,21 +4203,26 @@ class Ticket20955Tests(TestCase): task_get.creator.staffuser.staff task_get.owner.staffuser.staff qs = Task.objects.select_related( - 'creator__staffuser__staff', 'owner__staffuser__staff') - self.assertEqual(str(qs.query).count(' JOIN '), 6) + "creator__staffuser__staff", "owner__staffuser__staff" + ) + self.assertEqual(str(qs.query).count(" JOIN "), 6) task_select_related = qs.get(pk=task.pk) with self.assertNumQueries(0): - self.assertEqual(task_select_related.creator.staffuser.staff, - task_get.creator.staffuser.staff) - self.assertEqual(task_select_related.owner.staffuser.staff, - task_get.owner.staffuser.staff) + self.assertEqual( + task_select_related.creator.staffuser.staff, + task_get.creator.staffuser.staff, + ) + self.assertEqual( + task_select_related.owner.staffuser.staff, + task_get.owner.staffuser.staff, + ) class Ticket21203Tests(TestCase): def test_ticket_21203(self): p = Ticket21203Parent.objects.create(parent_bool=True) c = Ticket21203Child.objects.create(parent=p) - qs = Ticket21203Child.objects.select_related('parent').defer('parent__created') + qs = Ticket21203Child.objects.select_related("parent").defer("parent__created") self.assertSequenceEqual(qs, [c]) self.assertIs(qs[0].parent.parent_bool, True) @@ -3820,16 +4230,16 @@ class Ticket21203Tests(TestCase): class ValuesJoinPromotionTests(TestCase): def test_values_no_promotion_for_existing(self): qs = Node.objects.filter(parent__parent__isnull=False) - self.assertIn(' INNER JOIN ', str(qs.query)) - qs = qs.values('parent__parent__id') - self.assertIn(' INNER JOIN ', str(qs.query)) + self.assertIn(" INNER JOIN ", str(qs.query)) + qs = qs.values("parent__parent__id") + self.assertIn(" INNER JOIN ", str(qs.query)) # Make sure there is a left outer join without the filter. - qs = Node.objects.values('parent__parent__id') - self.assertIn(' LEFT OUTER JOIN ', str(qs.query)) + qs = Node.objects.values("parent__parent__id") + self.assertIn(" LEFT OUTER JOIN ", str(qs.query)) def test_non_nullable_fk_not_promoted(self): - qs = ObjectB.objects.values('objecta__name') - self.assertIn(' INNER JOIN ', str(qs.query)) + qs = ObjectB.objects.values("objecta__name") + self.assertIn(" INNER JOIN ", str(qs.query)) def test_ticket_21376(self): a = ObjectA.objects.create() @@ -3842,24 +4252,29 @@ class ValuesJoinPromotionTests(TestCase): ) self.assertEqual(qs.count(), 1) tblname = connection.ops.quote_name(ObjectB._meta.db_table) - self.assertIn(' LEFT OUTER JOIN %s' % tblname, str(qs.query)) + self.assertIn(" LEFT OUTER JOIN %s" % tblname, str(qs.query)) class ForeignKeyToBaseExcludeTests(TestCase): def test_ticket_21787(self): - sc1 = SpecialCategory.objects.create(special_name='sc1', name='sc1') - sc2 = SpecialCategory.objects.create(special_name='sc2', name='sc2') - sc3 = SpecialCategory.objects.create(special_name='sc3', name='sc3') + sc1 = SpecialCategory.objects.create(special_name="sc1", name="sc1") + sc2 = SpecialCategory.objects.create(special_name="sc2", name="sc2") + sc3 = SpecialCategory.objects.create(special_name="sc3", name="sc3") c1 = CategoryItem.objects.create(category=sc1) CategoryItem.objects.create(category=sc2) - self.assertSequenceEqual(SpecialCategory.objects.exclude(categoryitem__id=c1.pk).order_by('name'), [sc2, sc3]) - self.assertSequenceEqual(SpecialCategory.objects.filter(categoryitem__id=c1.pk), [sc1]) + self.assertSequenceEqual( + SpecialCategory.objects.exclude(categoryitem__id=c1.pk).order_by("name"), + [sc2, sc3], + ) + self.assertSequenceEqual( + SpecialCategory.objects.filter(categoryitem__id=c1.pk), [sc1] + ) class ReverseM2MCustomPkTests(TestCase): def test_ticket_21879(self): - cpt1 = CustomPkTag.objects.create(id='cpt1', tag='cpt1') - cp1 = CustomPk.objects.create(name='cp1', extra='extra') + cpt1 = CustomPkTag.objects.create(id="cpt1", tag="cpt1") + cp1 = CustomPk.objects.create(name="cp1", extra="extra") cp1.custompktag_set.add(cpt1) self.assertSequenceEqual(CustomPk.objects.filter(custompktag=cpt1), [cp1]) self.assertSequenceEqual(CustomPkTag.objects.filter(custom_pk=cp1), [cpt1]) @@ -3876,7 +4291,7 @@ class Ticket22429Tests(TestCase): cr = Classroom.objects.create(school=sc1) cr.students.add(st1) - queryset = Student.objects.filter(~Q(classroom__school=F('school'))) + queryset = Student.objects.filter(~Q(classroom__school=F("school"))) self.assertSequenceEqual(queryset, [st2]) @@ -3891,25 +4306,32 @@ class Ticket23605Tests(TestCase): a2 = Ticket23605A.objects.create() c1 = Ticket23605C.objects.create(field_c0=10000.0) Ticket23605B.objects.create( - field_b0=10000.0, field_b1=True, - modelc_fk=c1, modela_fk=a1) - complex_q = Q(pk__in=Ticket23605A.objects.filter( - Q( - # True for a1 as field_b0 = 10000, field_c0=10000 - # False for a2 as no ticket23605b found - ticket23605b__field_b0__gte=1000000 / - F("ticket23605b__modelc_fk__field_c0") - ) & - # True for a1 (field_b1=True) - Q(ticket23605b__field_b1=True) & ~Q(ticket23605b__pk__in=Ticket23605B.objects.filter( - ~( - # Same filters as above commented filters, but - # double-negated (one for Q() above, one for - # parentheses). So, again a1 match, a2 not. - Q(field_b1=True) & - Q(field_b0__gte=1000000 / F("modelc_fk__field_c0")) + field_b0=10000.0, field_b1=True, modelc_fk=c1, modela_fk=a1 + ) + complex_q = Q( + pk__in=Ticket23605A.objects.filter( + Q( + # True for a1 as field_b0 = 10000, field_c0=10000 + # False for a2 as no ticket23605b found + ticket23605b__field_b0__gte=1000000 + / F("ticket23605b__modelc_fk__field_c0") ) - ))).filter(ticket23605b__field_b1=True)) + & + # True for a1 (field_b1=True) + Q(ticket23605b__field_b1=True) + & ~Q( + ticket23605b__pk__in=Ticket23605B.objects.filter( + ~( + # Same filters as above commented filters, but + # double-negated (one for Q() above, one for + # parentheses). So, again a1 match, a2 not. + Q(field_b1=True) + & Q(field_b0__gte=1000000 / F("modelc_fk__field_c0")) + ) + ) + ) + ).filter(ticket23605b__field_b1=True) + ) qs1 = Ticket23605A.objects.filter(complex_q) self.assertSequenceEqual(qs1, [a1]) qs2 = Ticket23605A.objects.exclude(complex_q) @@ -3927,9 +4349,9 @@ class TestInvalidValuesRelation(SimpleTestCase): def test_invalid_values(self): msg = "Field 'id' expected a number but got 'abc'." with self.assertRaisesMessage(ValueError, msg): - Annotation.objects.filter(tag='abc') + Annotation.objects.filter(tag="abc") with self.assertRaisesMessage(ValueError, msg): - Annotation.objects.filter(tag__in=[123, 'abc']) + Annotation.objects.filter(tag__in=[123, "abc"]) class TestTicket24605(TestCase): @@ -3944,15 +4366,22 @@ class TestTicket24605(TestCase): i3 = Individual.objects.create(alive=True) i4 = Individual.objects.create(alive=False) - self.assertSequenceEqual(Individual.objects.filter(Q(alive=False), Q(related_individual__isnull=True)), [i4]) self.assertSequenceEqual( - Individual.objects.exclude(Q(alive=False), Q(related_individual__isnull=True)).order_by('pk'), - [i1, i2, i3] + Individual.objects.filter( + Q(alive=False), Q(related_individual__isnull=True) + ), + [i4], + ) + self.assertSequenceEqual( + Individual.objects.exclude( + Q(alive=False), Q(related_individual__isnull=True) + ).order_by("pk"), + [i1, i2, i3], ) class Ticket23622Tests(TestCase): - @skipUnlessDBFeature('can_distinct_on_fields') + @skipUnlessDBFeature("can_distinct_on_fields") def test_ticket_23622(self): """ Make sure __pk__in and __in work the same for related fields when @@ -3962,55 +4391,65 @@ class Ticket23622Tests(TestCase): a2 = Ticket23605A.objects.create() c1 = Ticket23605C.objects.create(field_c0=0.0) Ticket23605B.objects.create( - modela_fk=a1, field_b0=123, + modela_fk=a1, + field_b0=123, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a1, field_b0=23, + modela_fk=a1, + field_b0=23, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a1, field_b0=234, + modela_fk=a1, + field_b0=234, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a1, field_b0=12, + modela_fk=a1, + field_b0=12, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a2, field_b0=567, + modela_fk=a2, + field_b0=567, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a2, field_b0=76, + modela_fk=a2, + field_b0=76, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a2, field_b0=7, + modela_fk=a2, + field_b0=7, field_b1=True, modelc_fk=c1, ) Ticket23605B.objects.create( - modela_fk=a2, field_b0=56, + modela_fk=a2, + field_b0=56, field_b1=True, modelc_fk=c1, ) - qx = ( - Q(ticket23605b__pk__in=Ticket23605B.objects.order_by('modela_fk', '-field_b1').distinct('modela_fk')) & - Q(ticket23605b__field_b0__gte=300) - ) - qy = ( - Q(ticket23605b__in=Ticket23605B.objects.order_by('modela_fk', '-field_b1').distinct('modela_fk')) & - Q(ticket23605b__field_b0__gte=300) - ) + qx = Q( + ticket23605b__pk__in=Ticket23605B.objects.order_by( + "modela_fk", "-field_b1" + ).distinct("modela_fk") + ) & Q(ticket23605b__field_b0__gte=300) + qy = Q( + ticket23605b__in=Ticket23605B.objects.order_by( + "modela_fk", "-field_b1" + ).distinct("modela_fk") + ) & Q(ticket23605b__field_b0__gte=300) self.assertEqual( - set(Ticket23605A.objects.filter(qx).values_list('pk', flat=True)), - set(Ticket23605A.objects.filter(qy).values_list('pk', flat=True)) + set(Ticket23605A.objects.filter(qx).values_list("pk", flat=True)), + set(Ticket23605A.objects.filter(qy).values_list("pk", flat=True)), ) self.assertSequenceEqual(Ticket23605A.objects.filter(qx), [a2]) |