diff options
author | django-bot <ops@djangoproject.com> | 2022-02-03 20:24:19 +0100 |
---|---|---|
committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2022-02-07 20:37:05 +0100 |
commit | 9c19aff7c7561e3a82978a272ecdaad40dda5c00 (patch) | |
tree | f0506b668a013d0063e5fba3dbf4863b466713ba /tests/expressions | |
parent | f68fa8b45dfac545cfc4111d4e52804c86db68d3 (diff) | |
download | django-9c19aff7c7561e3a82978a272ecdaad40dda5c00.tar.gz |
Refs #33476 -- Reformatted code with Black.
Diffstat (limited to 'tests/expressions')
-rw-r--r-- | tests/expressions/models.py | 20 | ||||
-rw-r--r-- | tests/expressions/test_queryset_values.py | 58 | ||||
-rw-r--r-- | tests/expressions/tests.py | 1656 |
3 files changed, 1025 insertions, 709 deletions
diff --git a/tests/expressions/models.py b/tests/expressions/models.py index 82b9629f81..35985dc5f0 100644 --- a/tests/expressions/models.py +++ b/tests/expressions/models.py @@ -17,7 +17,7 @@ class Employee(models.Model): manager = models.ForeignKey(Manager, models.CASCADE, null=True) def __str__(self): - return '%s %s' % (self.firstname, self.lastname) + return "%s %s" % (self.firstname, self.lastname) class RemoteEmployee(Employee): @@ -31,12 +31,12 @@ class Company(models.Model): ceo = models.ForeignKey( Employee, models.CASCADE, - related_name='company_ceo_set', + related_name="company_ceo_set", ) point_of_contact = models.ForeignKey( Employee, models.SET_NULL, - related_name='company_point_of_contact_set', + related_name="company_point_of_contact_set", null=True, ) based_in_eu = models.BooleanField(default=False) @@ -46,12 +46,12 @@ class Company(models.Model): class Number(models.Model): - integer = models.BigIntegerField(db_column='the_integer') - float = models.FloatField(null=True, db_column='the_float') + integer = models.BigIntegerField(db_column="the_integer") + float = models.FloatField(null=True, db_column="the_float") decimal_value = models.DecimalField(max_digits=20, decimal_places=17, null=True) def __str__(self): - return '%i, %.3f, %.17f' % (self.integer, self.float, self.decimal_value) + return "%i, %.3f, %.17f" % (self.integer, self.float, self.decimal_value) class Experiment(models.Model): @@ -64,8 +64,8 @@ class Experiment(models.Model): scalar = models.IntegerField(null=True) class Meta: - db_table = 'expressions_ExPeRiMeNt' - ordering = ('name',) + db_table = "expressions_ExPeRiMeNt" + ordering = ("name",) def duration(self): return self.end - self.start @@ -87,8 +87,8 @@ class Time(models.Model): class SimulationRun(models.Model): - start = models.ForeignKey(Time, models.CASCADE, null=True, related_name='+') - end = models.ForeignKey(Time, models.CASCADE, null=True, related_name='+') + start = models.ForeignKey(Time, models.CASCADE, null=True, related_name="+") + end = models.ForeignKey(Time, models.CASCADE, null=True, related_name="+") midpoint = models.TimeField() def __str__(self): diff --git a/tests/expressions/test_queryset_values.py b/tests/expressions/test_queryset_values.py index a15cc26023..147f02fffb 100644 --- a/tests/expressions/test_queryset_values.py +++ b/tests/expressions/test_queryset_values.py @@ -8,54 +8,66 @@ class ValuesExpressionsTests(TestCase): @classmethod def setUpTestData(cls): Company.objects.create( - name='Example Inc.', num_employees=2300, num_chairs=5, - ceo=Employee.objects.create(firstname='Joe', lastname='Smith', salary=10) + name="Example Inc.", + num_employees=2300, + num_chairs=5, + ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10), ) Company.objects.create( - name='Foobar Ltd.', num_employees=3, num_chairs=4, - ceo=Employee.objects.create(firstname='Frank', lastname='Meyer', salary=20) + name="Foobar Ltd.", + num_employees=3, + num_chairs=4, + ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20), ) Company.objects.create( - name='Test GmbH', num_employees=32, num_chairs=1, - ceo=Employee.objects.create(firstname='Max', lastname='Mustermann', salary=30) + name="Test GmbH", + num_employees=32, + num_chairs=1, + ceo=Employee.objects.create( + firstname="Max", lastname="Mustermann", salary=30 + ), ) def test_values_expression(self): self.assertSequenceEqual( - Company.objects.values(salary=F('ceo__salary')), - [{'salary': 10}, {'salary': 20}, {'salary': 30}], + Company.objects.values(salary=F("ceo__salary")), + [{"salary": 10}, {"salary": 20}, {"salary": 30}], ) def test_values_expression_group_by(self): # values() applies annotate() first, so values selected are grouped by # id, not firstname. - Employee.objects.create(firstname='Joe', lastname='Jones', salary=2) - joes = Employee.objects.filter(firstname='Joe') + Employee.objects.create(firstname="Joe", lastname="Jones", salary=2) + joes = Employee.objects.filter(firstname="Joe") self.assertSequenceEqual( - joes.values('firstname', sum_salary=Sum('salary')).order_by('sum_salary'), - [{'firstname': 'Joe', 'sum_salary': 2}, {'firstname': 'Joe', 'sum_salary': 10}], + joes.values("firstname", sum_salary=Sum("salary")).order_by("sum_salary"), + [ + {"firstname": "Joe", "sum_salary": 2}, + {"firstname": "Joe", "sum_salary": 10}, + ], ) self.assertSequenceEqual( - joes.values('firstname').annotate(sum_salary=Sum('salary')), - [{'firstname': 'Joe', 'sum_salary': 12}] + joes.values("firstname").annotate(sum_salary=Sum("salary")), + [{"firstname": "Joe", "sum_salary": 12}], ) def test_chained_values_with_expression(self): - Employee.objects.create(firstname='Joe', lastname='Jones', salary=2) - joes = Employee.objects.filter(firstname='Joe').values('firstname') + Employee.objects.create(firstname="Joe", lastname="Jones", salary=2) + joes = Employee.objects.filter(firstname="Joe").values("firstname") self.assertSequenceEqual( - joes.values('firstname', sum_salary=Sum('salary')), - [{'firstname': 'Joe', 'sum_salary': 12}] + joes.values("firstname", sum_salary=Sum("salary")), + [{"firstname": "Joe", "sum_salary": 12}], ) self.assertSequenceEqual( - joes.values(sum_salary=Sum('salary')), - [{'sum_salary': 12}] + joes.values(sum_salary=Sum("salary")), [{"sum_salary": 12}] ) def test_values_list_expression(self): - companies = Company.objects.values_list('name', F('ceo__salary')) - self.assertSequenceEqual(companies, [('Example Inc.', 10), ('Foobar Ltd.', 20), ('Test GmbH', 30)]) + companies = Company.objects.values_list("name", F("ceo__salary")) + self.assertSequenceEqual( + companies, [("Example Inc.", 10), ("Foobar Ltd.", 20), ("Test GmbH", 30)] + ) def test_values_list_expression_flat(self): - companies = Company.objects.values_list(F('ceo__salary'), flat=True) + companies = Company.objects.values_list(F("ceo__salary"), flat=True) self.assertSequenceEqual(companies, (10, 20, 30)) diff --git a/tests/expressions/tests.py b/tests/expressions/tests.py index 1aa0049cbe..d800462129 100644 --- a/tests/expressions/tests.py +++ b/tests/expressions/tests.py @@ -10,29 +10,79 @@ from unittest import mock from django.core.exceptions import FieldError from django.db import DatabaseError, NotSupportedError, connection from django.db.models import ( - AutoField, Avg, BinaryField, BooleanField, Case, CharField, Count, - DateField, DateTimeField, DecimalField, DurationField, Exists, Expression, - ExpressionList, ExpressionWrapper, F, FloatField, Func, IntegerField, Max, - Min, Model, OrderBy, OuterRef, Q, StdDev, Subquery, Sum, TimeField, - UUIDField, Value, Variance, When, + AutoField, + Avg, + BinaryField, + BooleanField, + Case, + CharField, + Count, + DateField, + DateTimeField, + DecimalField, + DurationField, + Exists, + Expression, + ExpressionList, + ExpressionWrapper, + F, + FloatField, + Func, + IntegerField, + Max, + Min, + Model, + OrderBy, + OuterRef, + Q, + StdDev, + Subquery, + Sum, + TimeField, + UUIDField, + Value, + Variance, + When, ) from django.db.models.expressions import ( - Col, Combinable, CombinedExpression, RawSQL, Ref, + Col, + Combinable, + CombinedExpression, + RawSQL, + Ref, ) from django.db.models.functions import ( - Coalesce, Concat, Left, Length, Lower, Substr, Upper, + Coalesce, + Concat, + Left, + Length, + Lower, + Substr, + Upper, ) from django.db.models.sql import constants from django.db.models.sql.datastructures import Join from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature from django.test.utils import ( - Approximate, CaptureQueriesContext, isolate_apps, register_lookup, + Approximate, + CaptureQueriesContext, + isolate_apps, + register_lookup, ) from django.utils.functional import SimpleLazyObject from .models import ( - UUID, UUIDPK, Company, Employee, Experiment, Manager, Number, - RemoteEmployee, Result, SimulationRun, Time, + UUID, + UUIDPK, + Company, + Employee, + Experiment, + Manager, + Number, + RemoteEmployee, + Result, + SimulationRun, + Time, ) @@ -40,53 +90,71 @@ class BasicExpressionsTests(TestCase): @classmethod def setUpTestData(cls): cls.example_inc = Company.objects.create( - name="Example Inc.", num_employees=2300, num_chairs=5, - ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10) + name="Example Inc.", + num_employees=2300, + num_chairs=5, + ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10), ) cls.foobar_ltd = Company.objects.create( - name="Foobar Ltd.", num_employees=3, num_chairs=4, based_in_eu=True, - ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20) + name="Foobar Ltd.", + num_employees=3, + num_chairs=4, + based_in_eu=True, + ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20), + ) + cls.max = Employee.objects.create( + firstname="Max", lastname="Mustermann", salary=30 + ) + cls.gmbh = Company.objects.create( + name="Test GmbH", num_employees=32, num_chairs=1, ceo=cls.max ) - cls.max = Employee.objects.create(firstname='Max', lastname='Mustermann', salary=30) - cls.gmbh = Company.objects.create(name='Test GmbH', num_employees=32, num_chairs=1, ceo=cls.max) def setUp(self): self.company_query = Company.objects.values( "name", "num_employees", "num_chairs" - ).order_by( - "name", "num_employees", "num_chairs" - ) + ).order_by("name", "num_employees", "num_chairs") def test_annotate_values_aggregate(self): - companies = Company.objects.annotate( - salaries=F('ceo__salary'), - ).values('num_employees', 'salaries').aggregate( - result=Sum( - F('salaries') + F('num_employees'), - output_field=IntegerField() - ), + companies = ( + Company.objects.annotate( + salaries=F("ceo__salary"), + ) + .values("num_employees", "salaries") + .aggregate( + result=Sum( + F("salaries") + F("num_employees"), output_field=IntegerField() + ), + ) ) - self.assertEqual(companies['result'], 2395) + self.assertEqual(companies["result"], 2395) def test_annotate_values_filter(self): - companies = Company.objects.annotate( - foo=RawSQL('%s', ['value']), - ).filter(foo='value').order_by('name') + companies = ( + Company.objects.annotate( + foo=RawSQL("%s", ["value"]), + ) + .filter(foo="value") + .order_by("name") + ) self.assertSequenceEqual( companies, [self.example_inc, self.foobar_ltd, self.gmbh], ) def test_annotate_values_count(self): - companies = Company.objects.annotate(foo=RawSQL('%s', ['value'])) + companies = Company.objects.annotate(foo=RawSQL("%s", ["value"])) self.assertEqual(companies.count(), 3) - @skipUnlessDBFeature('supports_boolean_expr_in_select_clause') + @skipUnlessDBFeature("supports_boolean_expr_in_select_clause") def test_filtering_on_annotate_that_uses_q(self): self.assertEqual( Company.objects.annotate( - num_employees_check=ExpressionWrapper(Q(num_employees__gt=3), output_field=BooleanField()) - ).filter(num_employees_check=True).count(), + num_employees_check=ExpressionWrapper( + Q(num_employees__gt=3), output_field=BooleanField() + ) + ) + .filter(num_employees_check=True) + .count(), 2, ) @@ -101,7 +169,7 @@ class BasicExpressionsTests(TestCase): def test_filtering_on_rawsql_that_is_boolean(self): self.assertEqual( Company.objects.filter( - RawSQL('num_employees > %s', (3,), output_field=BooleanField()), + RawSQL("num_employees > %s", (3,), output_field=BooleanField()), ).count(), 2, ) @@ -111,17 +179,14 @@ class BasicExpressionsTests(TestCase): # find companies where the number of employees is greater # than the number of chairs. self.assertSequenceEqual( - self.company_query.filter(num_employees__gt=F("num_chairs")), [ + self.company_query.filter(num_employees__gt=F("num_chairs")), + [ { "num_chairs": 5, "name": "Example Inc.", "num_employees": 2300, }, - { - "num_chairs": 1, - "name": "Test GmbH", - "num_employees": 32 - }, + {"num_chairs": 1, "name": "Test GmbH", "num_employees": 32}, ], ) @@ -130,22 +195,11 @@ class BasicExpressionsTests(TestCase): # Make sure we have enough chairs self.company_query.update(num_chairs=F("num_employees")) self.assertSequenceEqual( - self.company_query, [ - { - "num_chairs": 2300, - "name": "Example Inc.", - "num_employees": 2300 - }, - { - "num_chairs": 3, - "name": "Foobar Ltd.", - "num_employees": 3 - }, - { - "num_chairs": 32, - "name": "Test GmbH", - "num_employees": 32 - } + self.company_query, + [ + {"num_chairs": 2300, "name": "Example Inc.", "num_employees": 2300}, + {"num_chairs": 3, "name": "Foobar Ltd.", "num_employees": 3}, + {"num_chairs": 32, "name": "Test GmbH", "num_employees": 32}, ], ) @@ -154,79 +208,50 @@ class BasicExpressionsTests(TestCase): # Make sure we have 2 spare chairs self.company_query.update(num_chairs=F("num_employees") + 2) self.assertSequenceEqual( - self.company_query, [ - { - 'num_chairs': 2302, - 'name': 'Example Inc.', - 'num_employees': 2300 - }, - { - 'num_chairs': 5, - 'name': 'Foobar Ltd.', - 'num_employees': 3 - }, - { - 'num_chairs': 34, - 'name': 'Test GmbH', - 'num_employees': 32 - } + self.company_query, + [ + {"num_chairs": 2302, "name": "Example Inc.", "num_employees": 2300}, + {"num_chairs": 5, "name": "Foobar Ltd.", "num_employees": 3}, + {"num_chairs": 34, "name": "Test GmbH", "num_employees": 32}, ], ) def test_order_of_operations(self): # Law of order of operations is followed - self.company_query.update(num_chairs=F('num_employees') + 2 * F('num_employees')) + self.company_query.update( + num_chairs=F("num_employees") + 2 * F("num_employees") + ) self.assertSequenceEqual( - self.company_query, [ - { - 'num_chairs': 6900, - 'name': 'Example Inc.', - 'num_employees': 2300 - }, - { - 'num_chairs': 9, - 'name': 'Foobar Ltd.', - 'num_employees': 3 - }, - { - 'num_chairs': 96, - 'name': 'Test GmbH', - 'num_employees': 32 - } + self.company_query, + [ + {"num_chairs": 6900, "name": "Example Inc.", "num_employees": 2300}, + {"num_chairs": 9, "name": "Foobar Ltd.", "num_employees": 3}, + {"num_chairs": 96, "name": "Test GmbH", "num_employees": 32}, ], ) def test_parenthesis_priority(self): # Law of order of operations can be overridden by parentheses - self.company_query.update(num_chairs=(F('num_employees') + 2) * F('num_employees')) + self.company_query.update( + num_chairs=(F("num_employees") + 2) * F("num_employees") + ) self.assertSequenceEqual( - self.company_query, [ - { - 'num_chairs': 5294600, - 'name': 'Example Inc.', - 'num_employees': 2300 - }, - { - 'num_chairs': 15, - 'name': 'Foobar Ltd.', - 'num_employees': 3 - }, - { - 'num_chairs': 1088, - 'name': 'Test GmbH', - 'num_employees': 32 - } + self.company_query, + [ + {"num_chairs": 5294600, "name": "Example Inc.", "num_employees": 2300}, + {"num_chairs": 15, "name": "Foobar Ltd.", "num_employees": 3}, + {"num_chairs": 1088, "name": "Test GmbH", "num_employees": 32}, ], ) def test_update_with_fk(self): # ForeignKey can become updated with the value of another ForeignKey. - self.assertEqual(Company.objects.update(point_of_contact=F('ceo')), 3) + self.assertEqual(Company.objects.update(point_of_contact=F("ceo")), 3) self.assertQuerysetEqual( Company.objects.all(), - ['Joe Smith', 'Frank Meyer', 'Max Mustermann'], + ["Joe Smith", "Frank Meyer", "Max Mustermann"], lambda c: str(c.point_of_contact), - ordered=False + ordered=False, ) def test_update_with_none(self): @@ -234,68 +259,75 @@ class BasicExpressionsTests(TestCase): Number.objects.create(integer=2) Number.objects.filter(float__isnull=False).update(float=Value(None)) self.assertQuerysetEqual( - Number.objects.all(), - [None, None], - lambda n: n.float, - ordered=False + Number.objects.all(), [None, None], lambda n: n.float, ordered=False ) def test_filter_with_join(self): # F Expressions can also span joins - Company.objects.update(point_of_contact=F('ceo')) + Company.objects.update(point_of_contact=F("ceo")) c = Company.objects.first() - c.point_of_contact = Employee.objects.create(firstname="Guido", lastname="van Rossum") + c.point_of_contact = Employee.objects.create( + firstname="Guido", lastname="van Rossum" + ) c.save() self.assertQuerysetEqual( - Company.objects.filter(ceo__firstname=F('point_of_contact__firstname')), - ['Foobar Ltd.', 'Test GmbH'], + Company.objects.filter(ceo__firstname=F("point_of_contact__firstname")), + ["Foobar Ltd.", "Test GmbH"], lambda c: c.name, - ordered=False + ordered=False, ) - Company.objects.exclude( - ceo__firstname=F("point_of_contact__firstname") - ).update(name="foo") + Company.objects.exclude(ceo__firstname=F("point_of_contact__firstname")).update( + name="foo" + ) self.assertEqual( - Company.objects.exclude( - ceo__firstname=F('point_of_contact__firstname') - ).get().name, + Company.objects.exclude(ceo__firstname=F("point_of_contact__firstname")) + .get() + .name, "foo", ) msg = "Joined field references are not permitted in this query" with self.assertRaisesMessage(FieldError, msg): Company.objects.exclude( - ceo__firstname=F('point_of_contact__firstname') - ).update(name=F('point_of_contact__lastname')) + ceo__firstname=F("point_of_contact__firstname") + ).update(name=F("point_of_contact__lastname")) def test_object_update(self): # F expressions can be used to update attributes on single objects - self.gmbh.num_employees = F('num_employees') + 4 + self.gmbh.num_employees = F("num_employees") + 4 self.gmbh.save() self.gmbh.refresh_from_db() self.assertEqual(self.gmbh.num_employees, 36) def test_new_object_save(self): # We should be able to use Funcs when inserting new data - test_co = Company(name=Lower(Value('UPPER')), num_employees=32, num_chairs=1, ceo=self.max) + test_co = Company( + name=Lower(Value("UPPER")), num_employees=32, num_chairs=1, ceo=self.max + ) test_co.save() test_co.refresh_from_db() self.assertEqual(test_co.name, "upper") def test_new_object_create(self): - test_co = Company.objects.create(name=Lower(Value('UPPER')), num_employees=32, num_chairs=1, ceo=self.max) + test_co = Company.objects.create( + name=Lower(Value("UPPER")), num_employees=32, num_chairs=1, ceo=self.max + ) test_co.refresh_from_db() self.assertEqual(test_co.name, "upper") def test_object_create_with_aggregate(self): # Aggregates are not allowed when inserting new data - msg = 'Aggregate functions are not allowed in this query (num_employees=Max(Value(1))).' + msg = "Aggregate functions are not allowed in this query (num_employees=Max(Value(1)))." with self.assertRaisesMessage(FieldError, msg): Company.objects.create( - name='Company', num_employees=Max(Value(1)), num_chairs=1, - ceo=Employee.objects.create(firstname="Just", lastname="Doit", salary=30), + name="Company", + num_employees=Max(Value(1)), + num_chairs=1, + ceo=Employee.objects.create( + firstname="Just", lastname="Doit", salary=30 + ), ) def test_object_update_fk(self): @@ -304,40 +336,42 @@ class BasicExpressionsTests(TestCase): test_gmbh = Company.objects.get(pk=self.gmbh.pk) msg = 'F(ceo)": "Company.point_of_contact" must be a "Employee" instance.' with self.assertRaisesMessage(ValueError, msg): - test_gmbh.point_of_contact = F('ceo') + test_gmbh.point_of_contact = F("ceo") test_gmbh.point_of_contact = self.gmbh.ceo test_gmbh.save() - test_gmbh.name = F('ceo__lastname') - msg = 'Joined field references are not permitted in this query' + test_gmbh.name = F("ceo__lastname") + msg = "Joined field references are not permitted in this query" with self.assertRaisesMessage(FieldError, msg): test_gmbh.save() def test_update_inherited_field_value(self): - msg = 'Joined field references are not permitted in this query' + msg = "Joined field references are not permitted in this query" with self.assertRaisesMessage(FieldError, msg): - RemoteEmployee.objects.update(adjusted_salary=F('salary') * 5) + RemoteEmployee.objects.update(adjusted_salary=F("salary") * 5) def test_object_update_unsaved_objects(self): # F expressions cannot be used to update attributes on objects which do # not yet exist in the database - acme = Company(name='The Acme Widget Co.', num_employees=12, num_chairs=5, ceo=self.max) + acme = Company( + name="The Acme Widget Co.", num_employees=12, num_chairs=5, ceo=self.max + ) acme.num_employees = F("num_employees") + 16 msg = ( 'Failed to insert expression "Col(expressions_company, ' 'expressions.Company.num_employees) + Value(16)" on ' - 'expressions.Company.num_employees. F() expressions can only be ' - 'used to update, not to insert.' + "expressions.Company.num_employees. F() expressions can only be " + "used to update, not to insert." ) with self.assertRaisesMessage(ValueError, msg): acme.save() acme.num_employees = 12 - acme.name = Lower(F('name')) + acme.name = Lower(F("name")) msg = ( 'Failed to insert expression "Lower(Col(expressions_company, ' 'expressions.Company.name))" on expressions.Company.name. F() ' - 'expressions can only be used to update, not to insert.' + "expressions can only be used to update, not to insert." ) with self.assertRaisesMessage(ValueError, msg): acme.save() @@ -346,7 +380,7 @@ class BasicExpressionsTests(TestCase): Employee.objects.create(firstname="John", lastname="Doe") test = Employee.objects.create(firstname="Test", lastname="test") - queryset = Employee.objects.filter(firstname__iexact=F('lastname')) + queryset = Employee.objects.filter(firstname__iexact=F("lastname")) self.assertSequenceEqual(queryset, [test]) def test_ticket_16731_startswith_lookup(self): @@ -354,18 +388,22 @@ class BasicExpressionsTests(TestCase): e2 = Employee.objects.create(firstname="Jack", lastname="Jackson") e3 = Employee.objects.create(firstname="Jack", lastname="jackson") self.assertSequenceEqual( - Employee.objects.filter(lastname__startswith=F('firstname')), - [e2, e3] if connection.features.has_case_insensitive_like else [e2] + Employee.objects.filter(lastname__startswith=F("firstname")), + [e2, e3] if connection.features.has_case_insensitive_like else [e2], + ) + qs = Employee.objects.filter(lastname__istartswith=F("firstname")).order_by( + "pk" ) - qs = Employee.objects.filter(lastname__istartswith=F('firstname')).order_by('pk') self.assertSequenceEqual(qs, [e2, e3]) def test_ticket_18375_join_reuse(self): # Reverse multijoin F() references and the lookup target the same join. # Pre #18375 the F() join was generated first and the lookup couldn't # reuse that join. - qs = Employee.objects.filter(company_ceo_set__num_chairs=F('company_ceo_set__num_employees')) - self.assertEqual(str(qs.query).count('JOIN'), 1) + qs = Employee.objects.filter( + company_ceo_set__num_chairs=F("company_ceo_set__num_employees") + ) + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_ticket_18375_kwarg_ordering(self): # The next query was dict-randomization dependent - if the "gte=1" @@ -373,10 +411,10 @@ class BasicExpressionsTests(TestCase): # gte lookup, if F() was seen first, then it generated a join the # other lookups could not reuse. qs = Employee.objects.filter( - company_ceo_set__num_chairs=F('company_ceo_set__num_employees'), + company_ceo_set__num_chairs=F("company_ceo_set__num_employees"), company_ceo_set__num_chairs__gte=1, ) - self.assertEqual(str(qs.query).count('JOIN'), 1) + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_ticket_18375_kwarg_ordering_2(self): # Another similar case for F() than above. Now we have the same join @@ -385,25 +423,27 @@ class BasicExpressionsTests(TestCase): # randomization was enabled, that is the generated query dependent # on which clause was seen first. qs = Employee.objects.filter( - company_ceo_set__num_employees=F('pk'), - pk=F('company_ceo_set__num_employees') + company_ceo_set__num_employees=F("pk"), + pk=F("company_ceo_set__num_employees"), ) - self.assertEqual(str(qs.query).count('JOIN'), 1) + self.assertEqual(str(qs.query).count("JOIN"), 1) def test_ticket_18375_chained_filters(self): # F() expressions do not reuse joins from previous filter. - qs = Employee.objects.filter( - company_ceo_set__num_employees=F('pk') - ).filter( - company_ceo_set__num_employees=F('company_ceo_set__num_employees') + qs = Employee.objects.filter(company_ceo_set__num_employees=F("pk")).filter( + company_ceo_set__num_employees=F("company_ceo_set__num_employees") ) - self.assertEqual(str(qs.query).count('JOIN'), 2) + self.assertEqual(str(qs.query).count("JOIN"), 2) def test_order_by_exists(self): - mary = Employee.objects.create(firstname='Mary', lastname='Mustermann', salary=20) - mustermanns_by_seniority = Employee.objects.filter(lastname='Mustermann').order_by( + mary = Employee.objects.create( + firstname="Mary", lastname="Mustermann", salary=20 + ) + mustermanns_by_seniority = Employee.objects.filter( + lastname="Mustermann" + ).order_by( # Order by whether the employee is the CEO of a company - Exists(Company.objects.filter(ceo=OuterRef('pk'))).desc() + Exists(Company.objects.filter(ceo=OuterRef("pk"))).desc() ) self.assertSequenceEqual(mustermanns_by_seniority, [self.max, mary]) @@ -424,7 +464,7 @@ class BasicExpressionsTests(TestCase): ELSE 0 END """, [], - ).asc() + ).asc(), ) for qs in ( Company.objects.all(), @@ -437,10 +477,10 @@ class BasicExpressionsTests(TestCase): ) def test_outerref(self): - inner = Company.objects.filter(point_of_contact=OuterRef('pk')) + inner = Company.objects.filter(point_of_contact=OuterRef("pk")) msg = ( - 'This queryset contains a reference to an outer query and may only ' - 'be used in a subquery.' + "This queryset contains a reference to an outer query and may only " + "be used in a subquery." ) with self.assertRaisesMessage(ValueError, msg): inner.exists() @@ -449,98 +489,119 @@ class BasicExpressionsTests(TestCase): self.assertIs(outer.exists(), True) def test_exist_single_field_output_field(self): - queryset = Company.objects.values('pk') + queryset = Company.objects.values("pk") self.assertIsInstance(Exists(queryset).output_field, BooleanField) def test_subquery(self): - Company.objects.filter(name='Example Inc.').update( - point_of_contact=Employee.objects.get(firstname='Joe', lastname='Smith'), + Company.objects.filter(name="Example Inc.").update( + point_of_contact=Employee.objects.get(firstname="Joe", lastname="Smith"), ceo=self.max, ) - Employee.objects.create(firstname='Bob', lastname='Brown', salary=40) - qs = Employee.objects.annotate( - is_point_of_contact=Exists(Company.objects.filter(point_of_contact=OuterRef('pk'))), - is_not_point_of_contact=~Exists(Company.objects.filter(point_of_contact=OuterRef('pk'))), - is_ceo_of_small_company=Exists(Company.objects.filter(num_employees__lt=200, ceo=OuterRef('pk'))), - is_ceo_small_2=~~Exists(Company.objects.filter(num_employees__lt=200, ceo=OuterRef('pk'))), - largest_company=Subquery(Company.objects.order_by('-num_employees').filter( - Q(ceo=OuterRef('pk')) | Q(point_of_contact=OuterRef('pk')) - ).values('name')[:1], output_field=CharField()) - ).values( - 'firstname', - 'is_point_of_contact', - 'is_not_point_of_contact', - 'is_ceo_of_small_company', - 'is_ceo_small_2', - 'largest_company', - ).order_by('firstname') + Employee.objects.create(firstname="Bob", lastname="Brown", salary=40) + qs = ( + Employee.objects.annotate( + is_point_of_contact=Exists( + Company.objects.filter(point_of_contact=OuterRef("pk")) + ), + is_not_point_of_contact=~Exists( + Company.objects.filter(point_of_contact=OuterRef("pk")) + ), + is_ceo_of_small_company=Exists( + Company.objects.filter(num_employees__lt=200, ceo=OuterRef("pk")) + ), + is_ceo_small_2=~~Exists( + Company.objects.filter(num_employees__lt=200, ceo=OuterRef("pk")) + ), + largest_company=Subquery( + Company.objects.order_by("-num_employees") + .filter(Q(ceo=OuterRef("pk")) | Q(point_of_contact=OuterRef("pk"))) + .values("name")[:1], + output_field=CharField(), + ), + ) + .values( + "firstname", + "is_point_of_contact", + "is_not_point_of_contact", + "is_ceo_of_small_company", + "is_ceo_small_2", + "largest_company", + ) + .order_by("firstname") + ) results = list(qs) # Could use Coalesce(subq, Value('')) instead except for the bug in # cx_Oracle mentioned in #23843. bob = results[0] - if bob['largest_company'] == '' and connection.features.interprets_empty_strings_as_nulls: - bob['largest_company'] = None - - self.assertEqual(results, [ - { - 'firstname': 'Bob', - 'is_point_of_contact': False, - 'is_not_point_of_contact': True, - 'is_ceo_of_small_company': False, - 'is_ceo_small_2': False, - 'largest_company': None, - }, - { - 'firstname': 'Frank', - 'is_point_of_contact': False, - 'is_not_point_of_contact': True, - 'is_ceo_of_small_company': True, - 'is_ceo_small_2': True, - 'largest_company': 'Foobar Ltd.', - }, - { - 'firstname': 'Joe', - 'is_point_of_contact': True, - 'is_not_point_of_contact': False, - 'is_ceo_of_small_company': False, - 'is_ceo_small_2': False, - 'largest_company': 'Example Inc.', - }, - { - 'firstname': 'Max', - 'is_point_of_contact': False, - 'is_not_point_of_contact': True, - 'is_ceo_of_small_company': True, - 'is_ceo_small_2': True, - 'largest_company': 'Example Inc.' - } - ]) + if ( + bob["largest_company"] == "" + and connection.features.interprets_empty_strings_as_nulls + ): + bob["largest_company"] = None + + self.assertEqual( + results, + [ + { + "firstname": "Bob", + "is_point_of_contact": False, + "is_not_point_of_contact": True, + "is_ceo_of_small_company": False, + "is_ceo_small_2": False, + "largest_company": None, + }, + { + "firstname": "Frank", + "is_point_of_contact": False, + "is_not_point_of_contact": True, + "is_ceo_of_small_company": True, + "is_ceo_small_2": True, + "largest_company": "Foobar Ltd.", + }, + { + "firstname": "Joe", + "is_point_of_contact": True, + "is_not_point_of_contact": False, + "is_ceo_of_small_company": False, + "is_ceo_small_2": False, + "largest_company": "Example Inc.", + }, + { + "firstname": "Max", + "is_point_of_contact": False, + "is_not_point_of_contact": True, + "is_ceo_of_small_company": True, + "is_ceo_small_2": True, + "largest_company": "Example Inc.", + }, + ], + ) # A less elegant way to write the same query: this uses a LEFT OUTER # JOIN and an IS NULL, inside a WHERE NOT IN which is probably less # efficient than EXISTS. self.assertCountEqual( - qs.filter(is_point_of_contact=True).values('pk'), - Employee.objects.exclude(company_point_of_contact_set=None).values('pk') + qs.filter(is_point_of_contact=True).values("pk"), + Employee.objects.exclude(company_point_of_contact_set=None).values("pk"), ) def test_subquery_eq(self): qs = Employee.objects.annotate( - is_ceo=Exists(Company.objects.filter(ceo=OuterRef('pk'))), + is_ceo=Exists(Company.objects.filter(ceo=OuterRef("pk"))), is_point_of_contact=Exists( - Company.objects.filter(point_of_contact=OuterRef('pk')), + Company.objects.filter(point_of_contact=OuterRef("pk")), ), small_company=Exists( queryset=Company.objects.filter(num_employees__lt=200), ), ).filter(is_ceo=True, is_point_of_contact=False, small_company=True) self.assertNotEqual( - qs.query.annotations['is_ceo'], - qs.query.annotations['is_point_of_contact'], + qs.query.annotations["is_ceo"], + qs.query.annotations["is_point_of_contact"], ) self.assertNotEqual( - qs.query.annotations['is_ceo'], - qs.query.annotations['small_company'], + qs.query.annotations["is_ceo"], + qs.query.annotations["small_company"], ) def test_subquery_sql(self): @@ -550,92 +611,114 @@ class BasicExpressionsTests(TestCase): self.assertIs(employees.query.subquery, False) compiler = employees_subquery.query.get_compiler(connection=connection) sql, _ = employees_subquery.as_sql(compiler, connection) - self.assertIn('(SELECT ', sql) + self.assertIn("(SELECT ", sql) def test_in_subquery(self): # This is a contrived test (and you really wouldn't write this query), # but it is a succinct way to test the __in=Subquery() construct. - small_companies = Company.objects.filter(num_employees__lt=200).values('pk') + small_companies = Company.objects.filter(num_employees__lt=200).values("pk") subquery_test = Company.objects.filter(pk__in=Subquery(small_companies)) self.assertCountEqual(subquery_test, [self.foobar_ltd, self.gmbh]) - subquery_test2 = Company.objects.filter(pk=Subquery(small_companies.filter(num_employees=3))) + subquery_test2 = Company.objects.filter( + pk=Subquery(small_companies.filter(num_employees=3)) + ) self.assertCountEqual(subquery_test2, [self.foobar_ltd]) def test_uuid_pk_subquery(self): u = UUIDPK.objects.create() UUID.objects.create(uuid_fk=u) - qs = UUIDPK.objects.filter(id__in=Subquery(UUID.objects.values('uuid_fk__id'))) + qs = UUIDPK.objects.filter(id__in=Subquery(UUID.objects.values("uuid_fk__id"))) self.assertCountEqual(qs, [u]) def test_nested_subquery(self): - inner = Company.objects.filter(point_of_contact=OuterRef('pk')) + inner = Company.objects.filter(point_of_contact=OuterRef("pk")) outer = Employee.objects.annotate(is_point_of_contact=Exists(inner)) contrived = Employee.objects.annotate( is_point_of_contact=Subquery( - outer.filter(pk=OuterRef('pk')).values('is_point_of_contact'), + outer.filter(pk=OuterRef("pk")).values("is_point_of_contact"), output_field=BooleanField(), ), ) self.assertCountEqual(contrived.values_list(), outer.values_list()) def test_nested_subquery_join_outer_ref(self): - inner = Employee.objects.filter(pk=OuterRef('ceo__pk')).values('pk') + inner = Employee.objects.filter(pk=OuterRef("ceo__pk")).values("pk") qs = Employee.objects.annotate( ceo_company=Subquery( Company.objects.filter( ceo__in=inner, - ceo__pk=OuterRef('pk'), - ).values('pk'), + ceo__pk=OuterRef("pk"), + ).values("pk"), ), ) self.assertSequenceEqual( - qs.values_list('ceo_company', flat=True), + qs.values_list("ceo_company", flat=True), [self.example_inc.pk, self.foobar_ltd.pk, self.gmbh.pk], ) def test_nested_subquery_outer_ref_2(self): - first = Time.objects.create(time='09:00') - second = Time.objects.create(time='17:00') - third = Time.objects.create(time='21:00') - SimulationRun.objects.bulk_create([ - SimulationRun(start=first, end=second, midpoint='12:00'), - SimulationRun(start=first, end=third, midpoint='15:00'), - SimulationRun(start=second, end=first, midpoint='00:00'), - ]) - inner = Time.objects.filter(time=OuterRef(OuterRef('time')), pk=OuterRef('start')).values('time') - middle = SimulationRun.objects.annotate(other=Subquery(inner)).values('other')[:1] + first = Time.objects.create(time="09:00") + second = Time.objects.create(time="17:00") + third = Time.objects.create(time="21:00") + SimulationRun.objects.bulk_create( + [ + SimulationRun(start=first, end=second, midpoint="12:00"), + SimulationRun(start=first, end=third, midpoint="15:00"), + SimulationRun(start=second, end=first, midpoint="00:00"), + ] + ) + inner = Time.objects.filter( + time=OuterRef(OuterRef("time")), pk=OuterRef("start") + ).values("time") + middle = SimulationRun.objects.annotate(other=Subquery(inner)).values("other")[ + :1 + ] outer = Time.objects.annotate(other=Subquery(middle, output_field=TimeField())) # This is a contrived example. It exercises the double OuterRef form. self.assertCountEqual(outer, [first, second, third]) def test_nested_subquery_outer_ref_with_autofield(self): - first = Time.objects.create(time='09:00') - second = Time.objects.create(time='17:00') - SimulationRun.objects.create(start=first, end=second, midpoint='12:00') - inner = SimulationRun.objects.filter(start=OuterRef(OuterRef('pk'))).values('start') - middle = Time.objects.annotate(other=Subquery(inner)).values('other')[:1] - outer = Time.objects.annotate(other=Subquery(middle, output_field=IntegerField())) + first = Time.objects.create(time="09:00") + second = Time.objects.create(time="17:00") + SimulationRun.objects.create(start=first, end=second, midpoint="12:00") + inner = SimulationRun.objects.filter(start=OuterRef(OuterRef("pk"))).values( + "start" + ) + middle = Time.objects.annotate(other=Subquery(inner)).values("other")[:1] + outer = Time.objects.annotate( + other=Subquery(middle, output_field=IntegerField()) + ) # This exercises the double OuterRef form with AutoField as pk. self.assertCountEqual(outer, [first, second]) def test_annotations_within_subquery(self): - Company.objects.filter(num_employees__lt=50).update(ceo=Employee.objects.get(firstname='Frank')) - inner = Company.objects.filter( - ceo=OuterRef('pk') - ).values('ceo').annotate(total_employees=Sum('num_employees')).values('total_employees') - outer = Employee.objects.annotate(total_employees=Subquery(inner)).filter(salary__lte=Subquery(inner)) + Company.objects.filter(num_employees__lt=50).update( + ceo=Employee.objects.get(firstname="Frank") + ) + inner = ( + Company.objects.filter(ceo=OuterRef("pk")) + .values("ceo") + .annotate(total_employees=Sum("num_employees")) + .values("total_employees") + ) + outer = Employee.objects.annotate(total_employees=Subquery(inner)).filter( + salary__lte=Subquery(inner) + ) self.assertSequenceEqual( - outer.order_by('-total_employees').values('salary', 'total_employees'), - [{'salary': 10, 'total_employees': 2300}, {'salary': 20, 'total_employees': 35}], + outer.order_by("-total_employees").values("salary", "total_employees"), + [ + {"salary": 10, "total_employees": 2300}, + {"salary": 20, "total_employees": 35}, + ], ) def test_subquery_references_joined_table_twice(self): inner = Company.objects.filter( - num_chairs__gte=OuterRef('ceo__salary'), - num_employees__gte=OuterRef('point_of_contact__salary'), + num_chairs__gte=OuterRef("ceo__salary"), + num_employees__gte=OuterRef("point_of_contact__salary"), ) # Another contrived example (there is no need to have a subquery here) - outer = Company.objects.filter(pk__in=Subquery(inner.values('pk'))) + outer = Company.objects.filter(pk__in=Subquery(inner.values("pk"))) self.assertFalse(outer.exists()) def test_subquery_filter_by_aggregate(self): @@ -644,14 +727,17 @@ class BasicExpressionsTests(TestCase): qs = Number.objects.annotate( min_valuable_count=Subquery( Employee.objects.filter( - salary=OuterRef('integer'), - ).annotate(cnt=Count('salary')).filter(cnt__gt=0).values('cnt')[:1] + salary=OuterRef("integer"), + ) + .annotate(cnt=Count("salary")) + .filter(cnt__gt=0) + .values("cnt")[:1] ), ) self.assertEqual(qs.get().float, 1.2) def test_subquery_filter_by_lazy(self): - self.max.manager = Manager.objects.create(name='Manager') + self.max.manager = Manager.objects.create(name="Manager") self.max.save() max_manager = SimpleLazyObject( lambda: Manager.objects.get(pk=self.max.manager.pk) @@ -659,8 +745,8 @@ class BasicExpressionsTests(TestCase): qs = Company.objects.annotate( ceo_manager=Subquery( Employee.objects.filter( - lastname=OuterRef('ceo__lastname'), - ).values('manager'), + lastname=OuterRef("ceo__lastname"), + ).values("manager"), ), ).filter(ceo_manager=max_manager) self.assertEqual(qs.get(), self.gmbh) @@ -670,32 +756,32 @@ class BasicExpressionsTests(TestCase): aggregate = Company.objects.annotate( ceo_salary=Subquery( Employee.objects.filter( - id=OuterRef('ceo_id'), - ).values('salary') + id=OuterRef("ceo_id"), + ).values("salary") ), ).aggregate( - ceo_salary_gt_20=Count('pk', filter=Q(ceo_salary__gt=20)), + ceo_salary_gt_20=Count("pk", filter=Q(ceo_salary__gt=20)), ) - self.assertEqual(aggregate, {'ceo_salary_gt_20': 1}) + self.assertEqual(aggregate, {"ceo_salary_gt_20": 1}) # Aggregation over a subquery annotation doesn't annotate the subquery # twice in the inner query. - sql = ctx.captured_queries[0]['sql'] - self.assertLessEqual(sql.count('SELECT'), 3) + sql = ctx.captured_queries[0]["sql"] + self.assertLessEqual(sql.count("SELECT"), 3) # GROUP BY isn't required to aggregate over a query that doesn't # contain nested aggregates. - self.assertNotIn('GROUP BY', sql) + self.assertNotIn("GROUP BY", sql) - @skipUnlessDBFeature('supports_over_clause') + @skipUnlessDBFeature("supports_over_clause") def test_aggregate_rawsql_annotation(self): with self.assertNumQueries(1) as ctx: aggregate = Company.objects.annotate( - salary=RawSQL('SUM(num_chairs) OVER (ORDER BY num_employees)', []), + salary=RawSQL("SUM(num_chairs) OVER (ORDER BY num_employees)", []), ).aggregate( - count=Count('pk'), + count=Count("pk"), ) - self.assertEqual(aggregate, {'count': 3}) - sql = ctx.captured_queries[0]['sql'] - self.assertNotIn('GROUP BY', sql) + self.assertEqual(aggregate, {"count": 3}) + sql = ctx.captured_queries[0]["sql"] + self.assertNotIn("GROUP BY", sql) def test_explicit_output_field(self): class FuncA(Func): @@ -708,57 +794,59 @@ class BasicExpressionsTests(TestCase): self.assertEqual(expr.output_field, FuncA.output_field) def test_outerref_mixed_case_table_name(self): - inner = Result.objects.filter(result_time__gte=OuterRef('experiment__assigned')) - outer = Result.objects.filter(pk__in=Subquery(inner.values('pk'))) + inner = Result.objects.filter(result_time__gte=OuterRef("experiment__assigned")) + outer = Result.objects.filter(pk__in=Subquery(inner.values("pk"))) self.assertFalse(outer.exists()) def test_outerref_with_operator(self): - inner = Company.objects.filter(num_employees=OuterRef('ceo__salary') + 2) - outer = Company.objects.filter(pk__in=Subquery(inner.values('pk'))) - self.assertEqual(outer.get().name, 'Test GmbH') + inner = Company.objects.filter(num_employees=OuterRef("ceo__salary") + 2) + outer = Company.objects.filter(pk__in=Subquery(inner.values("pk"))) + self.assertEqual(outer.get().name, "Test GmbH") def test_nested_outerref_with_function(self): - self.gmbh.point_of_contact = Employee.objects.get(lastname='Meyer') + self.gmbh.point_of_contact = Employee.objects.get(lastname="Meyer") self.gmbh.save() inner = Employee.objects.filter( - lastname__startswith=Left(OuterRef(OuterRef('lastname')), 1), + lastname__startswith=Left(OuterRef(OuterRef("lastname")), 1), ) qs = Employee.objects.annotate( ceo_company=Subquery( Company.objects.filter( point_of_contact__in=inner, - ceo__pk=OuterRef('pk'), - ).values('name'), + ceo__pk=OuterRef("pk"), + ).values("name"), ), ).filter(ceo_company__isnull=False) - self.assertEqual(qs.get().ceo_company, 'Test GmbH') + self.assertEqual(qs.get().ceo_company, "Test GmbH") def test_annotation_with_outerref(self): gmbh_salary = Company.objects.annotate( max_ceo_salary_raise=Subquery( Company.objects.annotate( - salary_raise=OuterRef('num_employees') + F('num_employees'), - ).order_by('-salary_raise').values('salary_raise')[:1], + salary_raise=OuterRef("num_employees") + F("num_employees"), + ) + .order_by("-salary_raise") + .values("salary_raise")[:1], output_field=IntegerField(), ), ).get(pk=self.gmbh.pk) self.assertEqual(gmbh_salary.max_ceo_salary_raise, 2332) def test_annotation_with_nested_outerref(self): - self.gmbh.point_of_contact = Employee.objects.get(lastname='Meyer') + self.gmbh.point_of_contact = Employee.objects.get(lastname="Meyer") self.gmbh.save() inner = Employee.objects.annotate( - outer_lastname=OuterRef(OuterRef('lastname')), - ).filter(lastname__startswith=Left('outer_lastname', 1)) + outer_lastname=OuterRef(OuterRef("lastname")), + ).filter(lastname__startswith=Left("outer_lastname", 1)) qs = Employee.objects.annotate( ceo_company=Subquery( Company.objects.filter( point_of_contact__in=inner, - ceo__pk=OuterRef('pk'), - ).values('name'), + ceo__pk=OuterRef("pk"), + ).values("name"), ), ).filter(ceo_company__isnull=False) - self.assertEqual(qs.get().ceo_company, 'Test GmbH') + self.assertEqual(qs.get().ceo_company, "Test GmbH") def test_pickle_expression(self): expr = Value(1) @@ -766,15 +854,19 @@ class BasicExpressionsTests(TestCase): self.assertEqual(pickle.loads(pickle.dumps(expr)), expr) def test_incorrect_field_in_F_expression(self): - with self.assertRaisesMessage(FieldError, "Cannot resolve keyword 'nope' into field."): - list(Employee.objects.filter(firstname=F('nope'))) + with self.assertRaisesMessage( + FieldError, "Cannot resolve keyword 'nope' into field." + ): + list(Employee.objects.filter(firstname=F("nope"))) def test_incorrect_joined_field_in_F_expression(self): - with self.assertRaisesMessage(FieldError, "Cannot resolve keyword 'nope' into field."): - list(Company.objects.filter(ceo__pk=F('point_of_contact__nope'))) + with self.assertRaisesMessage( + FieldError, "Cannot resolve keyword 'nope' into field." + ): + list(Company.objects.filter(ceo__pk=F("point_of_contact__nope"))) def test_exists_in_filter(self): - inner = Company.objects.filter(ceo=OuterRef('pk')).values('pk') + inner = Company.objects.filter(ceo=OuterRef("pk")).values("pk") qs1 = Employee.objects.filter(Exists(inner)) qs2 = Employee.objects.annotate(found=Exists(inner)).filter(found=True) self.assertCountEqual(qs1, qs2) @@ -782,23 +874,28 @@ class BasicExpressionsTests(TestCase): self.assertCountEqual(qs2, Employee.objects.exclude(~Exists(inner))) def test_subquery_in_filter(self): - inner = Company.objects.filter(ceo=OuterRef('pk')).values('based_in_eu') + inner = Company.objects.filter(ceo=OuterRef("pk")).values("based_in_eu") self.assertSequenceEqual( Employee.objects.filter(Subquery(inner)), [self.foobar_ltd.ceo], ) def test_subquery_group_by_outerref_in_filter(self): - inner = Company.objects.annotate( - employee=OuterRef('pk'), - ).values('employee').annotate( - min_num_chairs=Min('num_chairs'), - ).values('ceo') + inner = ( + Company.objects.annotate( + employee=OuterRef("pk"), + ) + .values("employee") + .annotate( + min_num_chairs=Min("num_chairs"), + ) + .values("ceo") + ) self.assertIs(Employee.objects.filter(pk__in=Subquery(inner)).exists(), True) def test_case_in_filter_if_boolean_output_field(self): - is_ceo = Company.objects.filter(ceo=OuterRef('pk')) - is_poc = Company.objects.filter(point_of_contact=OuterRef('pk')) + is_ceo = Company.objects.filter(ceo=OuterRef("pk")) + is_poc = Company.objects.filter(point_of_contact=OuterRef("pk")) qs = Employee.objects.filter( Case( When(Exists(is_ceo), then=True), @@ -810,8 +907,8 @@ class BasicExpressionsTests(TestCase): self.assertCountEqual(qs, [self.example_inc.ceo, self.foobar_ltd.ceo, self.max]) def test_boolean_expression_combined(self): - is_ceo = Company.objects.filter(ceo=OuterRef('pk')) - is_poc = Company.objects.filter(point_of_contact=OuterRef('pk')) + is_ceo = Company.objects.filter(ceo=OuterRef("pk")) + is_poc = Company.objects.filter(point_of_contact=OuterRef("pk")) self.gmbh.point_of_contact = self.max self.gmbh.save() self.assertCountEqual( @@ -840,7 +937,7 @@ class BasicExpressionsTests(TestCase): ) def test_boolean_expression_combined_with_empty_Q(self): - is_poc = Company.objects.filter(point_of_contact=OuterRef('pk')) + is_poc = Company.objects.filter(point_of_contact=OuterRef("pk")) self.gmbh.point_of_contact = self.max self.gmbh.save() tests = [ @@ -858,7 +955,7 @@ class BasicExpressionsTests(TestCase): self.assertCountEqual(Employee.objects.filter(conditions), [self.max]) def test_boolean_expression_in_Q(self): - is_poc = Company.objects.filter(point_of_contact=OuterRef('pk')) + is_poc = Company.objects.filter(point_of_contact=OuterRef("pk")) self.gmbh.point_of_contact = self.max self.gmbh.save() self.assertCountEqual(Employee.objects.filter(Q(Exists(is_poc))), [self.max]) @@ -867,27 +964,41 @@ class BasicExpressionsTests(TestCase): class IterableLookupInnerExpressionsTests(TestCase): @classmethod def setUpTestData(cls): - ceo = Employee.objects.create(firstname='Just', lastname='Doit', salary=30) + ceo = Employee.objects.create(firstname="Just", lastname="Doit", salary=30) # MySQL requires that the values calculated for expressions don't pass # outside of the field's range, so it's inconvenient to use the values # in the more general tests. - cls.c5020 = Company.objects.create(name='5020 Ltd', num_employees=50, num_chairs=20, ceo=ceo) - cls.c5040 = Company.objects.create(name='5040 Ltd', num_employees=50, num_chairs=40, ceo=ceo) - cls.c5050 = Company.objects.create(name='5050 Ltd', num_employees=50, num_chairs=50, ceo=ceo) - cls.c5060 = Company.objects.create(name='5060 Ltd', num_employees=50, num_chairs=60, ceo=ceo) - cls.c99300 = Company.objects.create(name='99300 Ltd', num_employees=99, num_chairs=300, ceo=ceo) + cls.c5020 = Company.objects.create( + name="5020 Ltd", num_employees=50, num_chairs=20, ceo=ceo + ) + cls.c5040 = Company.objects.create( + name="5040 Ltd", num_employees=50, num_chairs=40, ceo=ceo + ) + cls.c5050 = Company.objects.create( + name="5050 Ltd", num_employees=50, num_chairs=50, ceo=ceo + ) + cls.c5060 = Company.objects.create( + name="5060 Ltd", num_employees=50, num_chairs=60, ceo=ceo + ) + cls.c99300 = Company.objects.create( + name="99300 Ltd", num_employees=99, num_chairs=300, ceo=ceo + ) def test_in_lookup_allows_F_expressions_and_expressions_for_integers(self): # __in lookups can use F() expressions for integers. - queryset = Company.objects.filter(num_employees__in=([F('num_chairs') - 10])) + queryset = Company.objects.filter(num_employees__in=([F("num_chairs") - 10])) self.assertSequenceEqual(queryset, [self.c5060]) self.assertCountEqual( - Company.objects.filter(num_employees__in=([F('num_chairs') - 10, F('num_chairs') + 10])), + Company.objects.filter( + num_employees__in=([F("num_chairs") - 10, F("num_chairs") + 10]) + ), [self.c5040, self.c5060], ) self.assertCountEqual( Company.objects.filter( - num_employees__in=([F('num_chairs') - 10, F('num_chairs'), F('num_chairs') + 10]) + num_employees__in=( + [F("num_chairs") - 10, F("num_chairs"), F("num_chairs") + 10] + ) ), [self.c5040, self.c5050, self.c5060], ) @@ -901,13 +1012,17 @@ class IterableLookupInnerExpressionsTests(TestCase): SimulationRun.objects.create(start=None, end=t2, midpoint=midpoint) SimulationRun.objects.create(start=None, end=None, midpoint=midpoint) - queryset = SimulationRun.objects.filter(midpoint__range=[F('start__time'), F('end__time')]) + queryset = SimulationRun.objects.filter( + midpoint__range=[F("start__time"), F("end__time")] + ) self.assertSequenceEqual(queryset, [s1]) for alias in queryset.query.alias_map.values(): if isinstance(alias, Join): self.assertEqual(alias.join_type, constants.INNER) - queryset = SimulationRun.objects.exclude(midpoint__range=[F('start__time'), F('end__time')]) + queryset = SimulationRun.objects.exclude( + midpoint__range=[F("start__time"), F("end__time")] + ) self.assertQuerysetEqual(queryset, [], ordered=False) for alias in queryset.query.alias_map.values(): if isinstance(alias, Join): @@ -917,15 +1032,17 @@ class IterableLookupInnerExpressionsTests(TestCase): # Range lookups can use F() expressions for integers. Company.objects.filter(num_employees__exact=F("num_chairs")) self.assertCountEqual( - Company.objects.filter(num_employees__range=(F('num_chairs'), 100)), + Company.objects.filter(num_employees__range=(F("num_chairs"), 100)), [self.c5020, self.c5040, self.c5050], ) self.assertCountEqual( - Company.objects.filter(num_employees__range=(F('num_chairs') - 10, F('num_chairs') + 10)), + Company.objects.filter( + num_employees__range=(F("num_chairs") - 10, F("num_chairs") + 10) + ), [self.c5040, self.c5050, self.c5060], ) self.assertCountEqual( - Company.objects.filter(num_employees__range=(F('num_chairs') - 10, 100)), + Company.objects.filter(num_employees__range=(F("num_chairs") - 10, 100)), [self.c5020, self.c5040, self.c5050, self.c5060], ) self.assertCountEqual( @@ -934,15 +1051,19 @@ class IterableLookupInnerExpressionsTests(TestCase): ) def test_range_lookup_namedtuple(self): - EmployeeRange = namedtuple('EmployeeRange', ['minimum', 'maximum']) + EmployeeRange = namedtuple("EmployeeRange", ["minimum", "maximum"]) qs = Company.objects.filter( num_employees__range=EmployeeRange(minimum=51, maximum=100), ) self.assertSequenceEqual(qs, [self.c99300]) - @unittest.skipUnless(connection.vendor == 'sqlite', - "This defensive test only works on databases that don't validate parameter types") - def test_complex_expressions_do_not_introduce_sql_injection_via_untrusted_string_inclusion(self): + @unittest.skipUnless( + connection.vendor == "sqlite", + "This defensive test only works on databases that don't validate parameter types", + ) + def test_complex_expressions_do_not_introduce_sql_injection_via_untrusted_string_inclusion( + self, + ): """ This tests that SQL injection isn't possible using compilation of expressions in iterable filters, as their compilation happens before @@ -952,14 +1073,14 @@ class IterableLookupInnerExpressionsTests(TestCase): databases) demonstrates that the problem doesn't exist while keeping the test simple. """ - queryset = Company.objects.filter(name__in=[F('num_chairs') + '1)) OR ((1==1']) + queryset = Company.objects.filter(name__in=[F("num_chairs") + "1)) OR ((1==1"]) self.assertQuerysetEqual(queryset, [], ordered=False) def test_in_lookup_allows_F_expressions_and_expressions_for_datetimes(self): start = datetime.datetime(2016, 2, 3, 15, 0, 0) end = datetime.datetime(2016, 2, 5, 15, 0, 0) experiment_1 = Experiment.objects.create( - name='Integrity testing', + name="Integrity testing", assigned=start.date(), start=start, end=end, @@ -967,7 +1088,7 @@ class IterableLookupInnerExpressionsTests(TestCase): estimated_time=end - start, ) experiment_2 = Experiment.objects.create( - name='Taste testing', + name="Taste testing", assigned=start.date(), start=start, end=end, @@ -986,52 +1107,52 @@ class IterableLookupInnerExpressionsTests(TestCase): experiment=experiment_2, result_time=datetime.datetime(2016, 1, 8, 5, 0, 0), ) - within_experiment_time = [F('experiment__start'), F('experiment__end')] + within_experiment_time = [F("experiment__start"), F("experiment__end")] queryset = Result.objects.filter(result_time__range=within_experiment_time) self.assertSequenceEqual(queryset, [r1]) class FTests(SimpleTestCase): - def test_deepcopy(self): f = F("foo") g = deepcopy(f) self.assertEqual(f.name, g.name) def test_deconstruct(self): - f = F('name') + f = F("name") path, args, kwargs = f.deconstruct() - self.assertEqual(path, 'django.db.models.F') + self.assertEqual(path, "django.db.models.F") self.assertEqual(args, (f.name,)) self.assertEqual(kwargs, {}) def test_equal(self): - f = F('name') - same_f = F('name') - other_f = F('username') + f = F("name") + same_f = F("name") + other_f = F("username") self.assertEqual(f, same_f) self.assertNotEqual(f, other_f) def test_hash(self): - d = {F('name'): 'Bob'} - self.assertIn(F('name'), d) - self.assertEqual(d[F('name')], 'Bob') + d = {F("name"): "Bob"} + self.assertIn(F("name"), d) + self.assertEqual(d[F("name")], "Bob") def test_not_equal_Value(self): - f = F('name') - value = Value('name') + f = F("name") + value = Value("name") self.assertNotEqual(f, value) self.assertNotEqual(value, f) class ExpressionsTests(TestCase): - def test_F_reuse(self): - f = F('id') + f = F("id") n = Number.objects.create(integer=-1) c = Company.objects.create( - name="Example Inc.", num_employees=2300, num_chairs=5, - ceo=Employee.objects.create(firstname="Joe", lastname="Smith") + name="Example Inc.", + num_employees=2300, + num_chairs=5, + ceo=Employee.objects.create(firstname="Joe", lastname="Smith"), ) c_qs = Company.objects.filter(id=f) self.assertEqual(c_qs.get(), c) @@ -1047,27 +1168,29 @@ class ExpressionsTests(TestCase): properly escaped when using a pattern lookup with an expression refs #16731 """ - Employee.objects.bulk_create([ - Employee(firstname="Johnny", lastname="%John"), - Employee(firstname="Jean-Claude", lastname="Claud_"), - Employee(firstname="Jean-Claude", lastname="Claude%"), - Employee(firstname="Johnny", lastname="Joh\\n"), - Employee(firstname="Johnny", lastname="_ohn"), - ]) - claude = Employee.objects.create(firstname='Jean-Claude', lastname='Claude') - john = Employee.objects.create(firstname='Johnny', lastname='John') - john_sign = Employee.objects.create(firstname='%Joh\\nny', lastname='%Joh\\n') + Employee.objects.bulk_create( + [ + Employee(firstname="Johnny", lastname="%John"), + Employee(firstname="Jean-Claude", lastname="Claud_"), + Employee(firstname="Jean-Claude", lastname="Claude%"), + Employee(firstname="Johnny", lastname="Joh\\n"), + Employee(firstname="Johnny", lastname="_ohn"), + ] + ) + claude = Employee.objects.create(firstname="Jean-Claude", lastname="Claude") + john = Employee.objects.create(firstname="Johnny", lastname="John") + john_sign = Employee.objects.create(firstname="%Joh\\nny", lastname="%Joh\\n") self.assertCountEqual( - Employee.objects.filter(firstname__contains=F('lastname')), + Employee.objects.filter(firstname__contains=F("lastname")), [john_sign, john, claude], ) self.assertCountEqual( - Employee.objects.filter(firstname__startswith=F('lastname')), + Employee.objects.filter(firstname__startswith=F("lastname")), [john_sign, john], ) self.assertSequenceEqual( - Employee.objects.filter(firstname__endswith=F('lastname')), + Employee.objects.filter(firstname__endswith=F("lastname")), [claude], ) @@ -1077,60 +1200,57 @@ class ExpressionsTests(TestCase): properly escaped when using a case insensitive pattern lookup with an expression -- refs #16731 """ - Employee.objects.bulk_create([ - Employee(firstname="Johnny", lastname="%john"), - Employee(firstname="Jean-Claude", lastname="claud_"), - Employee(firstname="Jean-Claude", lastname="claude%"), - Employee(firstname="Johnny", lastname="joh\\n"), - Employee(firstname="Johnny", lastname="_ohn"), - ]) - claude = Employee.objects.create(firstname='Jean-Claude', lastname='claude') - john = Employee.objects.create(firstname='Johnny', lastname='john') - john_sign = Employee.objects.create(firstname='%Joh\\nny', lastname='%joh\\n') + Employee.objects.bulk_create( + [ + Employee(firstname="Johnny", lastname="%john"), + Employee(firstname="Jean-Claude", lastname="claud_"), + Employee(firstname="Jean-Claude", lastname="claude%"), + Employee(firstname="Johnny", lastname="joh\\n"), + Employee(firstname="Johnny", lastname="_ohn"), + ] + ) + claude = Employee.objects.create(firstname="Jean-Claude", lastname="claude") + john = Employee.objects.create(firstname="Johnny", lastname="john") + john_sign = Employee.objects.create(firstname="%Joh\\nny", lastname="%joh\\n") self.assertCountEqual( - Employee.objects.filter(firstname__icontains=F('lastname')), + Employee.objects.filter(firstname__icontains=F("lastname")), [john_sign, john, claude], ) self.assertCountEqual( - Employee.objects.filter(firstname__istartswith=F('lastname')), + Employee.objects.filter(firstname__istartswith=F("lastname")), [john_sign, john], ) self.assertSequenceEqual( - Employee.objects.filter(firstname__iendswith=F('lastname')), + Employee.objects.filter(firstname__iendswith=F("lastname")), [claude], ) -@isolate_apps('expressions') +@isolate_apps("expressions") class SimpleExpressionTests(SimpleTestCase): - def test_equal(self): self.assertEqual(Expression(), Expression()) self.assertEqual( - Expression(IntegerField()), - Expression(output_field=IntegerField()) + Expression(IntegerField()), Expression(output_field=IntegerField()) ) self.assertEqual(Expression(IntegerField()), mock.ANY) - self.assertNotEqual( - Expression(IntegerField()), - Expression(CharField()) - ) + self.assertNotEqual(Expression(IntegerField()), Expression(CharField())) class TestModel(Model): field = IntegerField() other_field = IntegerField() self.assertNotEqual( - Expression(TestModel._meta.get_field('field')), - Expression(TestModel._meta.get_field('other_field')), + Expression(TestModel._meta.get_field("field")), + Expression(TestModel._meta.get_field("other_field")), ) def test_hash(self): self.assertEqual(hash(Expression()), hash(Expression())) self.assertEqual( hash(Expression(IntegerField())), - hash(Expression(output_field=IntegerField())) + hash(Expression(output_field=IntegerField())), ) self.assertNotEqual( hash(Expression(IntegerField())), @@ -1142,19 +1262,18 @@ class SimpleExpressionTests(SimpleTestCase): other_field = IntegerField() self.assertNotEqual( - hash(Expression(TestModel._meta.get_field('field'))), - hash(Expression(TestModel._meta.get_field('other_field'))), + hash(Expression(TestModel._meta.get_field("field"))), + hash(Expression(TestModel._meta.get_field("other_field"))), ) class ExpressionsNumericTests(TestCase): - @classmethod def setUpTestData(cls): Number(integer=-1).save() Number(integer=42).save() Number(integer=1337).save() - Number.objects.update(float=F('integer')) + Number.objects.update(float=F("integer")) def test_fill_with_value_from_same_object(self): """ @@ -1165,19 +1284,21 @@ class ExpressionsNumericTests(TestCase): Number.objects.all(), [(-1, -1), (42, 42), (1337, 1337)], lambda n: (n.integer, round(n.float)), - ordered=False + ordered=False, ) def test_increment_value(self): """ We can increment a value of all objects in a query set. """ - self.assertEqual(Number.objects.filter(integer__gt=0).update(integer=F('integer') + 1), 2) + self.assertEqual( + Number.objects.filter(integer__gt=0).update(integer=F("integer") + 1), 2 + ) self.assertQuerysetEqual( Number.objects.all(), [(-1, -1), (43, 42), (1338, 1337)], lambda n: (n.integer, round(n.float)), - ordered=False + ordered=False, ) def test_filter_not_equals_other_field(self): @@ -1185,19 +1306,21 @@ class ExpressionsNumericTests(TestCase): We can filter for objects, where a value is not equals the value of an other field. """ - self.assertEqual(Number.objects.filter(integer__gt=0).update(integer=F('integer') + 1), 2) + self.assertEqual( + Number.objects.filter(integer__gt=0).update(integer=F("integer") + 1), 2 + ) self.assertQuerysetEqual( - Number.objects.exclude(float=F('integer')), + Number.objects.exclude(float=F("integer")), [(43, 42), (1338, 1337)], lambda n: (n.integer, round(n.float)), - ordered=False + ordered=False, ) def test_filter_decimal_expression(self): - obj = Number.objects.create(integer=0, float=1, decimal_value=Decimal('1')) + obj = Number.objects.create(integer=0, float=1, decimal_value=Decimal("1")) qs = Number.objects.annotate( x=ExpressionWrapper(Value(1), output_field=DecimalField()), - ).filter(Q(x=1, integer=0) & Q(x=Decimal('1'))) + ).filter(Q(x=1, integer=0) & Q(x=Decimal("1"))) self.assertSequenceEqual(qs, [obj]) def test_complex_expressions(self): @@ -1205,18 +1328,22 @@ class ExpressionsNumericTests(TestCase): Complex expressions of different connection types are possible. """ n = Number.objects.create(integer=10, float=123.45) - self.assertEqual(Number.objects.filter(pk=n.pk).update( - float=F('integer') + F('float') * 2), 1) + self.assertEqual( + Number.objects.filter(pk=n.pk).update(float=F("integer") + F("float") * 2), + 1, + ) self.assertEqual(Number.objects.get(pk=n.pk).integer, 10) - self.assertEqual(Number.objects.get(pk=n.pk).float, Approximate(256.900, places=3)) + self.assertEqual( + Number.objects.get(pk=n.pk).float, Approximate(256.900, places=3) + ) def test_decimal_expression(self): - n = Number.objects.create(integer=1, decimal_value=Decimal('0.5')) - n.decimal_value = F('decimal_value') - Decimal('0.4') + n = Number.objects.create(integer=1, decimal_value=Decimal("0.5")) + n.decimal_value = F("decimal_value") - Decimal("0.4") n.save() n.refresh_from_db() - self.assertEqual(n.decimal_value, Decimal('0.1')) + self.assertEqual(n.decimal_value, Decimal("0.1")) class ExpressionOperatorTests(TestCase): @@ -1228,149 +1355,187 @@ class ExpressionOperatorTests(TestCase): def test_lefthand_addition(self): # LH Addition of floats and integers Number.objects.filter(pk=self.n.pk).update( - integer=F('integer') + 15, - float=F('float') + 42.7 + integer=F("integer") + 15, float=F("float") + 42.7 ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3) + ) def test_lefthand_subtraction(self): # LH Subtraction of floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer') - 15, float=F('float') - 42.7) + Number.objects.filter(pk=self.n.pk).update( + integer=F("integer") - 15, float=F("float") - 42.7 + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(-27.200, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(-27.200, places=3) + ) def test_lefthand_multiplication(self): # Multiplication of floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer') * 15, float=F('float') * 42.7) + Number.objects.filter(pk=self.n.pk).update( + integer=F("integer") * 15, float=F("float") * 42.7 + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3) + ) def test_lefthand_division(self): # LH Division of floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer') / 2, float=F('float') / 42.7) + Number.objects.filter(pk=self.n.pk).update( + integer=F("integer") / 2, float=F("float") / 42.7 + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 21) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(0.363, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(0.363, places=3) + ) def test_lefthand_modulo(self): # LH Modulo arithmetic on integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer') % 20) + Number.objects.filter(pk=self.n.pk).update(integer=F("integer") % 20) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 2) def test_lefthand_modulo_null(self): # LH Modulo arithmetic on integers. - Employee.objects.create(firstname='John', lastname='Doe', salary=None) - qs = Employee.objects.annotate(modsalary=F('salary') % 20) + Employee.objects.create(firstname="John", lastname="Doe", salary=None) + qs = Employee.objects.annotate(modsalary=F("salary") % 20) self.assertIsNone(qs.get().salary) def test_lefthand_bitwise_and(self): # LH Bitwise ands on integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer').bitand(56)) - Number.objects.filter(pk=self.n1.pk).update(integer=F('integer').bitand(-56)) + Number.objects.filter(pk=self.n.pk).update(integer=F("integer").bitand(56)) + Number.objects.filter(pk=self.n1.pk).update(integer=F("integer").bitand(-56)) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 40) self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -64) def test_lefthand_bitwise_left_shift_operator(self): - Number.objects.update(integer=F('integer').bitleftshift(2)) + Number.objects.update(integer=F("integer").bitleftshift(2)) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 168) self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -168) def test_lefthand_bitwise_right_shift_operator(self): - Number.objects.update(integer=F('integer').bitrightshift(2)) + Number.objects.update(integer=F("integer").bitrightshift(2)) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 10) self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -11) def test_lefthand_bitwise_or(self): # LH Bitwise or on integers - Number.objects.update(integer=F('integer').bitor(48)) + Number.objects.update(integer=F("integer").bitor(48)) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 58) self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -10) def test_lefthand_transformed_field_bitwise_or(self): - Employee.objects.create(firstname='Max', lastname='Mustermann') + Employee.objects.create(firstname="Max", lastname="Mustermann") with register_lookup(CharField, Length): - qs = Employee.objects.annotate(bitor=F('lastname__length').bitor(48)) + qs = Employee.objects.annotate(bitor=F("lastname__length").bitor(48)) self.assertEqual(qs.get().bitor, 58) def test_lefthand_power(self): # LH Power arithmetic operation on floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=F('integer') ** 2, float=F('float') ** 1.5) + Number.objects.filter(pk=self.n.pk).update( + integer=F("integer") ** 2, float=F("float") ** 1.5 + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 1764) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(61.02, places=2)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(61.02, places=2) + ) def test_lefthand_bitwise_xor(self): - Number.objects.update(integer=F('integer').bitxor(48)) + Number.objects.update(integer=F("integer").bitxor(48)) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 26) self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -26) def test_lefthand_bitwise_xor_null(self): - employee = Employee.objects.create(firstname='John', lastname='Doe') - Employee.objects.update(salary=F('salary').bitxor(48)) + employee = Employee.objects.create(firstname="John", lastname="Doe") + Employee.objects.update(salary=F("salary").bitxor(48)) employee.refresh_from_db() self.assertIsNone(employee.salary) def test_lefthand_bitwise_xor_right_null(self): - employee = Employee.objects.create(firstname='John', lastname='Doe', salary=48) - Employee.objects.update(salary=F('salary').bitxor(None)) + employee = Employee.objects.create(firstname="John", lastname="Doe", salary=48) + Employee.objects.update(salary=F("salary").bitxor(None)) employee.refresh_from_db() self.assertIsNone(employee.salary) - @unittest.skipUnless(connection.vendor == 'oracle', "Oracle doesn't support bitwise XOR.") + @unittest.skipUnless( + connection.vendor == "oracle", "Oracle doesn't support bitwise XOR." + ) def test_lefthand_bitwise_xor_not_supported(self): - msg = 'Bitwise XOR is not supported in Oracle.' + msg = "Bitwise XOR is not supported in Oracle." with self.assertRaisesMessage(NotSupportedError, msg): - Number.objects.update(integer=F('integer').bitxor(48)) + Number.objects.update(integer=F("integer").bitxor(48)) def test_right_hand_addition(self): # Right hand operators - Number.objects.filter(pk=self.n.pk).update(integer=15 + F('integer'), float=42.7 + F('float')) + Number.objects.filter(pk=self.n.pk).update( + integer=15 + F("integer"), float=42.7 + F("float") + ) # RH Addition of floats and integers self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3) + ) def test_right_hand_subtraction(self): - Number.objects.filter(pk=self.n.pk).update(integer=15 - F('integer'), float=42.7 - F('float')) + Number.objects.filter(pk=self.n.pk).update( + integer=15 - F("integer"), float=42.7 - F("float") + ) # RH Subtraction of floats and integers self.assertEqual(Number.objects.get(pk=self.n.pk).integer, -27) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(27.200, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(27.200, places=3) + ) def test_right_hand_multiplication(self): # RH Multiplication of floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=15 * F('integer'), float=42.7 * F('float')) + Number.objects.filter(pk=self.n.pk).update( + integer=15 * F("integer"), float=42.7 * F("float") + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3) + ) def test_right_hand_division(self): # RH Division of floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=640 / F('integer'), float=42.7 / F('float')) + Number.objects.filter(pk=self.n.pk).update( + integer=640 / F("integer"), float=42.7 / F("float") + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 15) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(2.755, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(2.755, places=3) + ) def test_right_hand_modulo(self): # RH Modulo arithmetic on integers - Number.objects.filter(pk=self.n.pk).update(integer=69 % F('integer')) + Number.objects.filter(pk=self.n.pk).update(integer=69 % F("integer")) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27) def test_righthand_power(self): # RH Power arithmetic operation on floats and integers - Number.objects.filter(pk=self.n.pk).update(integer=2 ** F('integer'), float=1.5 ** F('float')) + Number.objects.filter(pk=self.n.pk).update( + integer=2 ** F("integer"), float=1.5 ** F("float") + ) self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 4398046511104) - self.assertEqual(Number.objects.get(pk=self.n.pk).float, Approximate(536.308, places=3)) + self.assertEqual( + Number.objects.get(pk=self.n.pk).float, Approximate(536.308, places=3) + ) class FTimeDeltaTests(TestCase): - @classmethod def setUpTestData(cls): cls.sday = sday = datetime.date(2010, 6, 25) @@ -1393,11 +1558,17 @@ class FTimeDeltaTests(TestCase): # e0: started same day as assigned, zero duration end = stime + delta0 cls.e0 = Experiment.objects.create( - name='e0', assigned=sday, start=stime, end=end, - completed=end.date(), estimated_time=delta0, + name="e0", + assigned=sday, + start=stime, + end=end, + completed=end.date(), + estimated_time=delta0, ) cls.deltas.append(delta0) - cls.delays.append(cls.e0.start - datetime.datetime.combine(cls.e0.assigned, midnight)) + cls.delays.append( + cls.e0.start - datetime.datetime.combine(cls.e0.assigned, midnight) + ) cls.days_long.append(cls.e0.completed - cls.e0.assigned) # e1: started one day after assigned, tiny duration, data @@ -1406,8 +1577,12 @@ class FTimeDeltaTests(TestCase): delay = datetime.timedelta(1) end = stime + delay + delta1 e1 = Experiment.objects.create( - name='e1', assigned=sday, start=stime + delay, end=end, - completed=end.date(), estimated_time=delta1, + name="e1", + assigned=sday, + start=stime + delay, + end=end, + completed=end.date(), + estimated_time=delta1, ) cls.deltas.append(delta1) cls.delays.append(e1.start - datetime.datetime.combine(e1.assigned, midnight)) @@ -1416,8 +1591,12 @@ class FTimeDeltaTests(TestCase): # e2: started three days after assigned, small duration end = stime + delta2 e2 = Experiment.objects.create( - name='e2', assigned=sday - datetime.timedelta(3), start=stime, - end=end, completed=end.date(), estimated_time=datetime.timedelta(hours=1), + name="e2", + assigned=sday - datetime.timedelta(3), + start=stime, + end=end, + completed=end.date(), + estimated_time=datetime.timedelta(hours=1), ) cls.deltas.append(delta2) cls.delays.append(e2.start - datetime.datetime.combine(e2.assigned, midnight)) @@ -1427,8 +1606,12 @@ class FTimeDeltaTests(TestCase): delay = datetime.timedelta(4) end = stime + delay + delta3 e3 = Experiment.objects.create( - name='e3', assigned=sday, start=stime + delay, end=end, - completed=end.date(), estimated_time=delta3, + name="e3", + assigned=sday, + start=stime + delay, + end=end, + completed=end.date(), + estimated_time=delta3, ) cls.deltas.append(delta3) cls.delays.append(e3.start - datetime.datetime.combine(e3.assigned, midnight)) @@ -1437,8 +1620,12 @@ class FTimeDeltaTests(TestCase): # e4: started 10 days after assignment, long duration end = stime + delta4 e4 = Experiment.objects.create( - name='e4', assigned=sday - datetime.timedelta(10), start=stime, - end=end, completed=end.date(), estimated_time=delta4 - datetime.timedelta(1), + name="e4", + assigned=sday - datetime.timedelta(10), + start=stime, + end=end, + completed=end.date(), + estimated_time=delta4 - datetime.timedelta(1), ) cls.deltas.append(delta4) cls.delays.append(e4.start - datetime.datetime.combine(e4.assigned, midnight)) @@ -1448,8 +1635,12 @@ class FTimeDeltaTests(TestCase): delay = datetime.timedelta(30) end = stime + delay + delta5 e5 = Experiment.objects.create( - name='e5', assigned=sday, start=stime + delay, end=end, - completed=end.date(), estimated_time=delta5, + name="e5", + assigned=sday, + start=stime + delay, + end=end, + completed=end.date(), + estimated_time=delta5, ) cls.deltas.append(delta5) cls.delays.append(e5.start - datetime.datetime.combine(e5.assigned, midnight)) @@ -1459,14 +1650,16 @@ class FTimeDeltaTests(TestCase): def test_multiple_query_compilation(self): # Ticket #21643 - queryset = Experiment.objects.filter(end__lt=F('start') + datetime.timedelta(hours=1)) + queryset = Experiment.objects.filter( + end__lt=F("start") + datetime.timedelta(hours=1) + ) q1 = str(queryset.query) q2 = str(queryset.query) self.assertEqual(q1, q2) def test_query_clone(self): # Ticket #21643 - Crash when compiling query more than once - qs = Experiment.objects.filter(end__lt=F('start') + datetime.timedelta(hours=1)) + qs = Experiment.objects.filter(end__lt=F("start") + datetime.timedelta(hours=1)) qs2 = qs.all() list(qs) list(qs2) @@ -1474,57 +1667,89 @@ class FTimeDeltaTests(TestCase): def test_delta_add(self): for i, delta in enumerate(self.deltas): - test_set = [e.name for e in Experiment.objects.filter(end__lt=F('start') + delta)] + test_set = [ + e.name for e in Experiment.objects.filter(end__lt=F("start") + delta) + ] self.assertEqual(test_set, self.expnames[:i]) - test_set = [e.name for e in Experiment.objects.filter(end__lt=delta + F('start'))] + test_set = [ + e.name for e in Experiment.objects.filter(end__lt=delta + F("start")) + ] self.assertEqual(test_set, self.expnames[:i]) - test_set = [e.name for e in Experiment.objects.filter(end__lte=F('start') + delta)] - self.assertEqual(test_set, self.expnames[:i + 1]) + test_set = [ + e.name for e in Experiment.objects.filter(end__lte=F("start") + delta) + ] + self.assertEqual(test_set, self.expnames[: i + 1]) def test_delta_subtract(self): for i, delta in enumerate(self.deltas): - test_set = [e.name for e in Experiment.objects.filter(start__gt=F('end') - delta)] + test_set = [ + e.name for e in Experiment.objects.filter(start__gt=F("end") - delta) + ] self.assertEqual(test_set, self.expnames[:i]) - test_set = [e.name for e in Experiment.objects.filter(start__gte=F('end') - delta)] - self.assertEqual(test_set, self.expnames[:i + 1]) + test_set = [ + e.name for e in Experiment.objects.filter(start__gte=F("end") - delta) + ] + self.assertEqual(test_set, self.expnames[: i + 1]) def test_exclude(self): for i, delta in enumerate(self.deltas): - test_set = [e.name for e in Experiment.objects.exclude(end__lt=F('start') + delta)] + test_set = [ + e.name for e in Experiment.objects.exclude(end__lt=F("start") + delta) + ] self.assertEqual(test_set, self.expnames[i:]) - test_set = [e.name for e in Experiment.objects.exclude(end__lte=F('start') + delta)] - self.assertEqual(test_set, self.expnames[i + 1:]) + test_set = [ + e.name for e in Experiment.objects.exclude(end__lte=F("start") + delta) + ] + self.assertEqual(test_set, self.expnames[i + 1 :]) def test_date_comparison(self): for i, days in enumerate(self.days_long): - test_set = [e.name for e in Experiment.objects.filter(completed__lt=F('assigned') + days)] + test_set = [ + e.name + for e in Experiment.objects.filter(completed__lt=F("assigned") + days) + ] self.assertEqual(test_set, self.expnames[:i]) - test_set = [e.name for e in Experiment.objects.filter(completed__lte=F('assigned') + days)] - self.assertEqual(test_set, self.expnames[:i + 1]) + test_set = [ + e.name + for e in Experiment.objects.filter(completed__lte=F("assigned") + days) + ] + self.assertEqual(test_set, self.expnames[: i + 1]) def test_mixed_comparisons1(self): for i, delay in enumerate(self.delays): - test_set = [e.name for e in Experiment.objects.filter(assigned__gt=F('start') - delay)] + test_set = [ + e.name + for e in Experiment.objects.filter(assigned__gt=F("start") - delay) + ] self.assertEqual(test_set, self.expnames[:i]) - test_set = [e.name for e in Experiment.objects.filter(assigned__gte=F('start') - delay)] - self.assertEqual(test_set, self.expnames[:i + 1]) + test_set = [ + e.name + for e in Experiment.objects.filter(assigned__gte=F("start") - delay) + ] + self.assertEqual(test_set, self.expnames[: i + 1]) def test_mixed_comparisons2(self): for i, delay in enumerate(self.delays): delay = datetime.timedelta(delay.days) - test_set = [e.name for e in Experiment.objects.filter(start__lt=F('assigned') + delay)] + test_set = [ + e.name + for e in Experiment.objects.filter(start__lt=F("assigned") + delay) + ] self.assertEqual(test_set, self.expnames[:i]) test_set = [ - e.name for e in Experiment.objects.filter(start__lte=F('assigned') + delay + datetime.timedelta(1)) + e.name + for e in Experiment.objects.filter( + start__lte=F("assigned") + delay + datetime.timedelta(1) + ) ] - self.assertEqual(test_set, self.expnames[:i + 1]) + self.assertEqual(test_set, self.expnames[: i + 1]) def test_delta_update(self): for delta in self.deltas: @@ -1533,7 +1758,7 @@ class FTimeDeltaTests(TestCase): expected_starts = [e.start + delta for e in exps] expected_ends = [e.end + delta for e in exps] - Experiment.objects.update(start=F('start') + delta, end=F('end') + delta) + Experiment.objects.update(start=F("start") + delta, end=F("end") + delta) exps = Experiment.objects.all() new_starts = [e.start for e in exps] new_ends = [e.end for e in exps] @@ -1544,32 +1769,42 @@ class FTimeDeltaTests(TestCase): def test_invalid_operator(self): with self.assertRaises(DatabaseError): - list(Experiment.objects.filter(start=F('start') * datetime.timedelta(0))) + list(Experiment.objects.filter(start=F("start") * datetime.timedelta(0))) def test_durationfield_add(self): - zeros = [e.name for e in Experiment.objects.filter(start=F('start') + F('estimated_time'))] - self.assertEqual(zeros, ['e0']) + zeros = [ + e.name + for e in Experiment.objects.filter(start=F("start") + F("estimated_time")) + ] + self.assertEqual(zeros, ["e0"]) - end_less = [e.name for e in Experiment.objects.filter(end__lt=F('start') + F('estimated_time'))] - self.assertEqual(end_less, ['e2']) + end_less = [ + e.name + for e in Experiment.objects.filter(end__lt=F("start") + F("estimated_time")) + ] + self.assertEqual(end_less, ["e2"]) delta_math = [ - e.name for e in - Experiment.objects.filter(end__gte=F('start') + F('estimated_time') + datetime.timedelta(hours=1)) + e.name + for e in Experiment.objects.filter( + end__gte=F("start") + F("estimated_time") + datetime.timedelta(hours=1) + ) ] - self.assertEqual(delta_math, ['e4']) + self.assertEqual(delta_math, ["e4"]) - queryset = Experiment.objects.annotate(shifted=ExpressionWrapper( - F('start') + Value(None, output_field=DurationField()), - output_field=DateTimeField(), - )) + queryset = Experiment.objects.annotate( + shifted=ExpressionWrapper( + F("start") + Value(None, output_field=DurationField()), + output_field=DateTimeField(), + ) + ) self.assertIsNone(queryset.first().shifted) def test_durationfield_multiply_divide(self): Experiment.objects.update(scalar=2) tests = [ - (Decimal('2'), 2), - (F('scalar'), 2), + (Decimal("2"), 2), + (F("scalar"), 2), (2, 2), (3.2, 3.2), ] @@ -1577,11 +1812,11 @@ class FTimeDeltaTests(TestCase): with self.subTest(expr=expr): qs = Experiment.objects.annotate( multiplied=ExpressionWrapper( - expr * F('estimated_time'), + expr * F("estimated_time"), output_field=DurationField(), ), divided=ExpressionWrapper( - F('estimated_time') / expr, + F("estimated_time") / expr, output_field=DurationField(), ), ) @@ -1597,126 +1832,147 @@ class FTimeDeltaTests(TestCase): def test_duration_expressions(self): for delta in self.deltas: - qs = Experiment.objects.annotate(duration=F('estimated_time') + delta) + qs = Experiment.objects.annotate(duration=F("estimated_time") + delta) for obj in qs: self.assertEqual(obj.duration, obj.estimated_time + delta) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_date_subtraction(self): queryset = Experiment.objects.annotate( - completion_duration=F('completed') - F('assigned'), + completion_duration=F("completed") - F("assigned"), ) - at_least_5_days = {e.name for e in queryset.filter(completion_duration__gte=datetime.timedelta(days=5))} - self.assertEqual(at_least_5_days, {'e3', 'e4', 'e5'}) + at_least_5_days = { + e.name + for e in queryset.filter( + completion_duration__gte=datetime.timedelta(days=5) + ) + } + self.assertEqual(at_least_5_days, {"e3", "e4", "e5"}) - at_least_120_days = {e.name for e in queryset.filter(completion_duration__gte=datetime.timedelta(days=120))} - self.assertEqual(at_least_120_days, {'e5'}) + at_least_120_days = { + e.name + for e in queryset.filter( + completion_duration__gte=datetime.timedelta(days=120) + ) + } + self.assertEqual(at_least_120_days, {"e5"}) - less_than_5_days = {e.name for e in queryset.filter(completion_duration__lt=datetime.timedelta(days=5))} - self.assertEqual(less_than_5_days, {'e0', 'e1', 'e2'}) + less_than_5_days = { + e.name + for e in queryset.filter(completion_duration__lt=datetime.timedelta(days=5)) + } + self.assertEqual(less_than_5_days, {"e0", "e1", "e2"}) queryset = Experiment.objects.annotate( - difference=F('completed') - Value(None, output_field=DateField()), + difference=F("completed") - Value(None, output_field=DateField()), ) self.assertIsNone(queryset.first().difference) - queryset = Experiment.objects.annotate(shifted=ExpressionWrapper( - F('completed') - Value(None, output_field=DurationField()), - output_field=DateField(), - )) + queryset = Experiment.objects.annotate( + shifted=ExpressionWrapper( + F("completed") - Value(None, output_field=DurationField()), + output_field=DateField(), + ) + ) self.assertIsNone(queryset.first().shifted) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_date_subquery_subtraction(self): - subquery = Experiment.objects.filter(pk=OuterRef('pk')).values('completed') + subquery = Experiment.objects.filter(pk=OuterRef("pk")).values("completed") queryset = Experiment.objects.annotate( - difference=subquery - F('completed'), + difference=subquery - F("completed"), ).filter(difference=datetime.timedelta()) self.assertTrue(queryset.exists()) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_date_case_subtraction(self): queryset = Experiment.objects.annotate( date_case=Case( - When(Q(name='e0'), then=F('completed')), + When(Q(name="e0"), then=F("completed")), output_field=DateField(), ), completed_value=Value( self.e0.completed, output_field=DateField(), ), - difference=F('date_case') - F('completed_value'), + difference=F("date_case") - F("completed_value"), ).filter(difference=datetime.timedelta()) self.assertEqual(queryset.get(), self.e0) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_time_subtraction(self): Time.objects.create(time=datetime.time(12, 30, 15, 2345)) queryset = Time.objects.annotate( - difference=F('time') - Value(datetime.time(11, 15, 0)), + difference=F("time") - Value(datetime.time(11, 15, 0)), ) self.assertEqual( queryset.get().difference, - datetime.timedelta(hours=1, minutes=15, seconds=15, microseconds=2345) + datetime.timedelta(hours=1, minutes=15, seconds=15, microseconds=2345), ) queryset = Time.objects.annotate( - difference=F('time') - Value(None, output_field=TimeField()), + difference=F("time") - Value(None, output_field=TimeField()), ) self.assertIsNone(queryset.first().difference) - queryset = Time.objects.annotate(shifted=ExpressionWrapper( - F('time') - Value(None, output_field=DurationField()), - output_field=TimeField(), - )) + queryset = Time.objects.annotate( + shifted=ExpressionWrapper( + F("time") - Value(None, output_field=DurationField()), + output_field=TimeField(), + ) + ) self.assertIsNone(queryset.first().shifted) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_time_subquery_subtraction(self): Time.objects.create(time=datetime.time(12, 30, 15, 2345)) - subquery = Time.objects.filter(pk=OuterRef('pk')).values('time') + subquery = Time.objects.filter(pk=OuterRef("pk")).values("time") queryset = Time.objects.annotate( - difference=subquery - F('time'), + difference=subquery - F("time"), ).filter(difference=datetime.timedelta()) self.assertTrue(queryset.exists()) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_datetime_subtraction(self): under_estimate = [ - e.name for e in Experiment.objects.filter(estimated_time__gt=F('end') - F('start')) + e.name + for e in Experiment.objects.filter(estimated_time__gt=F("end") - F("start")) ] - self.assertEqual(under_estimate, ['e2']) + self.assertEqual(under_estimate, ["e2"]) over_estimate = [ - e.name for e in Experiment.objects.filter(estimated_time__lt=F('end') - F('start')) + e.name + for e in Experiment.objects.filter(estimated_time__lt=F("end") - F("start")) ] - self.assertEqual(over_estimate, ['e4']) + self.assertEqual(over_estimate, ["e4"]) queryset = Experiment.objects.annotate( - difference=F('start') - Value(None, output_field=DateTimeField()), + difference=F("start") - Value(None, output_field=DateTimeField()), ) self.assertIsNone(queryset.first().difference) - queryset = Experiment.objects.annotate(shifted=ExpressionWrapper( - F('start') - Value(None, output_field=DurationField()), - output_field=DateTimeField(), - )) + queryset = Experiment.objects.annotate( + shifted=ExpressionWrapper( + F("start") - Value(None, output_field=DurationField()), + output_field=DateTimeField(), + ) + ) self.assertIsNone(queryset.first().shifted) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_datetime_subquery_subtraction(self): - subquery = Experiment.objects.filter(pk=OuterRef('pk')).values('start') + subquery = Experiment.objects.filter(pk=OuterRef("pk")).values("start") queryset = Experiment.objects.annotate( - difference=subquery - F('start'), + difference=subquery - F("start"), ).filter(difference=datetime.timedelta()) self.assertTrue(queryset.exists()) - @skipUnlessDBFeature('supports_temporal_subtraction') + @skipUnlessDBFeature("supports_temporal_subtraction") def test_datetime_subtraction_microseconds(self): delta = datetime.timedelta(microseconds=8999999999999999) - Experiment.objects.update(end=F('start') + delta) - qs = Experiment.objects.annotate(delta=F('end') - F('start')) + Experiment.objects.update(end=F("start") + delta) + qs = Experiment.objects.annotate(delta=F("end") - F("start")) for e in qs: self.assertEqual(e.delta, delta) @@ -1724,43 +1980,58 @@ class FTimeDeltaTests(TestCase): # Exclude e1 which has very high precision so we can test this on all # backends regardless of whether or not it supports # microsecond_precision. - over_estimate = Experiment.objects.exclude(name='e1').filter( - completed__gt=self.stime + F('estimated_time'), - ).order_by('name') - self.assertQuerysetEqual(over_estimate, ['e3', 'e4', 'e5'], lambda e: e.name) + over_estimate = ( + Experiment.objects.exclude(name="e1") + .filter( + completed__gt=self.stime + F("estimated_time"), + ) + .order_by("name") + ) + self.assertQuerysetEqual(over_estimate, ["e3", "e4", "e5"], lambda e: e.name) def test_duration_with_datetime_microseconds(self): delta = datetime.timedelta(microseconds=8999999999999999) - qs = Experiment.objects.annotate(dt=ExpressionWrapper( - F('start') + delta, - output_field=DateTimeField(), - )) + qs = Experiment.objects.annotate( + dt=ExpressionWrapper( + F("start") + delta, + output_field=DateTimeField(), + ) + ) for e in qs: self.assertEqual(e.dt, e.start + delta) def test_date_minus_duration(self): more_than_4_days = Experiment.objects.filter( - assigned__lt=F('completed') - Value(datetime.timedelta(days=4)) + assigned__lt=F("completed") - Value(datetime.timedelta(days=4)) ) - self.assertQuerysetEqual(more_than_4_days, ['e3', 'e4', 'e5'], lambda e: e.name) + self.assertQuerysetEqual(more_than_4_days, ["e3", "e4", "e5"], lambda e: e.name) def test_negative_timedelta_update(self): # subtract 30 seconds, 30 minutes, 2 hours and 2 days - experiments = Experiment.objects.filter(name='e0').annotate( - start_sub_seconds=F('start') + datetime.timedelta(seconds=-30), - ).annotate( - start_sub_minutes=F('start_sub_seconds') + datetime.timedelta(minutes=-30), - ).annotate( - start_sub_hours=F('start_sub_minutes') + datetime.timedelta(hours=-2), - ).annotate( - new_start=F('start_sub_hours') + datetime.timedelta(days=-2), + experiments = ( + Experiment.objects.filter(name="e0") + .annotate( + start_sub_seconds=F("start") + datetime.timedelta(seconds=-30), + ) + .annotate( + start_sub_minutes=F("start_sub_seconds") + + datetime.timedelta(minutes=-30), + ) + .annotate( + start_sub_hours=F("start_sub_minutes") + datetime.timedelta(hours=-2), + ) + .annotate( + new_start=F("start_sub_hours") + datetime.timedelta(days=-2), + ) ) expected_start = datetime.datetime(2010, 6, 23, 9, 45, 0) # subtract 30 microseconds - experiments = experiments.annotate(new_start=F('new_start') + datetime.timedelta(microseconds=-30)) + experiments = experiments.annotate( + new_start=F("new_start") + datetime.timedelta(microseconds=-30) + ) expected_start += datetime.timedelta(microseconds=+746970) - experiments.update(start=F('new_start')) - e0 = Experiment.objects.get(name='e0') + experiments.update(start=F("new_start")) + e0 = Experiment.objects.get(name="e0") self.assertEqual(e0.start, expected_start) @@ -1772,71 +2043,79 @@ class ValueTests(TestCase): def test_update_UUIDField_using_Value(self): UUID.objects.create() - UUID.objects.update(uuid=Value(uuid.UUID('12345678901234567890123456789012'), output_field=UUIDField())) - self.assertEqual(UUID.objects.get().uuid, uuid.UUID('12345678901234567890123456789012')) + UUID.objects.update( + uuid=Value( + uuid.UUID("12345678901234567890123456789012"), output_field=UUIDField() + ) + ) + self.assertEqual( + UUID.objects.get().uuid, uuid.UUID("12345678901234567890123456789012") + ) def test_deconstruct(self): - value = Value('name') + value = Value("name") path, args, kwargs = value.deconstruct() - self.assertEqual(path, 'django.db.models.Value') + self.assertEqual(path, "django.db.models.Value") self.assertEqual(args, (value.value,)) self.assertEqual(kwargs, {}) def test_deconstruct_output_field(self): - value = Value('name', output_field=CharField()) + value = Value("name", output_field=CharField()) path, args, kwargs = value.deconstruct() - self.assertEqual(path, 'django.db.models.Value') + self.assertEqual(path, "django.db.models.Value") self.assertEqual(args, (value.value,)) self.assertEqual(len(kwargs), 1) - self.assertEqual(kwargs['output_field'].deconstruct(), CharField().deconstruct()) + self.assertEqual( + kwargs["output_field"].deconstruct(), CharField().deconstruct() + ) def test_repr(self): tests = [ - (None, 'Value(None)'), - ('str', "Value('str')"), - (True, 'Value(True)'), - (42, 'Value(42)'), + (None, "Value(None)"), + ("str", "Value('str')"), + (True, "Value(True)"), + (42, "Value(42)"), ( datetime.datetime(2019, 5, 15), - 'Value(datetime.datetime(2019, 5, 15, 0, 0))', + "Value(datetime.datetime(2019, 5, 15, 0, 0))", ), - (Decimal('3.14'), "Value(Decimal('3.14'))"), + (Decimal("3.14"), "Value(Decimal('3.14'))"), ] for value, expected in tests: with self.subTest(value=value): self.assertEqual(repr(Value(value)), expected) def test_equal(self): - value = Value('name') - self.assertEqual(value, Value('name')) - self.assertNotEqual(value, Value('username')) + value = Value("name") + self.assertEqual(value, Value("name")) + self.assertNotEqual(value, Value("username")) def test_hash(self): - d = {Value('name'): 'Bob'} - self.assertIn(Value('name'), d) - self.assertEqual(d[Value('name')], 'Bob') + d = {Value("name"): "Bob"} + self.assertIn(Value("name"), d) + self.assertEqual(d[Value("name")], "Bob") def test_equal_output_field(self): - value = Value('name', output_field=CharField()) - same_value = Value('name', output_field=CharField()) - other_value = Value('name', output_field=TimeField()) - no_output_field = Value('name') + value = Value("name", output_field=CharField()) + same_value = Value("name", output_field=CharField()) + other_value = Value("name", output_field=TimeField()) + no_output_field = Value("name") self.assertEqual(value, same_value) self.assertNotEqual(value, other_value) self.assertNotEqual(value, no_output_field) def test_raise_empty_expressionlist(self): - msg = 'ExpressionList requires at least one expression' + msg = "ExpressionList requires at least one expression" with self.assertRaisesMessage(ValueError, msg): ExpressionList() def test_compile_unresolved(self): # This test might need to be revisited later on if #25425 is enforced. compiler = Time.objects.all().query.get_compiler(connection=connection) - value = Value('foo') - self.assertEqual(value.as_sql(compiler, connection), ('%s', ['foo'])) - value = Value('foo', output_field=CharField()) - self.assertEqual(value.as_sql(compiler, connection), ('%s', ['foo'])) + value = Value("foo") + self.assertEqual(value.as_sql(compiler, connection), ("%s", ["foo"])) + value = Value("foo", output_field=CharField()) + self.assertEqual(value.as_sql(compiler, connection), ("%s", ["foo"])) def test_output_field_decimalfield(self): Time.objects.create() @@ -1845,7 +2124,7 @@ class ValueTests(TestCase): def test_resolve_output_field(self): value_types = [ - ('str', CharField), + ("str", CharField), (True, BooleanField), (42, IntegerField), (3.14, FloatField), @@ -1853,8 +2132,8 @@ class ValueTests(TestCase): (datetime.datetime(2019, 5, 15), DateTimeField), (datetime.time(3, 16), TimeField), (datetime.timedelta(1), DurationField), - (Decimal('3.14'), DecimalField), - (b'', BinaryField), + (Decimal("3.14"), DecimalField), + (b"", BinaryField), (uuid.uuid4(), UUIDField), ] for value, output_field_type in value_types: @@ -1863,7 +2142,7 @@ class ValueTests(TestCase): self.assertIsInstance(expr.output_field, output_field_type) def test_resolve_output_field_failure(self): - msg = 'Cannot resolve expression type, unknown output_field' + msg = "Cannot resolve expression type, unknown output_field" with self.assertRaisesMessage(FieldError, msg): Value(object()).output_field @@ -1874,7 +2153,7 @@ class ValueTests(TestCase): and this demonstrates that they don't throw an exception. """ value_types = [ - 'str', + "str", True, 42, 3.14, @@ -1882,8 +2161,8 @@ class ValueTests(TestCase): datetime.datetime(2019, 5, 15), datetime.time(3, 16), datetime.timedelta(1), - Decimal('3.14'), - b'', + Decimal("3.14"), + b"", uuid.uuid4(), ] for value in value_types: @@ -1895,12 +2174,16 @@ class ValueTests(TestCase): class ExistsTests(TestCase): def test_optimizations(self): with CaptureQueriesContext(connection) as context: - list(Experiment.objects.values(exists=Exists( - Experiment.objects.order_by('pk'), - )).order_by()) + list( + Experiment.objects.values( + exists=Exists( + Experiment.objects.order_by("pk"), + ) + ).order_by() + ) captured_queries = context.captured_queries self.assertEqual(len(captured_queries), 1) - captured_sql = captured_queries[0]['sql'] + captured_sql = captured_queries[0]["sql"] self.assertNotIn( connection.ops.quote_name(Experiment._meta.pk.column), captured_sql, @@ -1909,13 +2192,11 @@ class ExistsTests(TestCase): connection.ops.limit_offset_sql(None, 1), captured_sql, ) - self.assertNotIn('ORDER BY', captured_sql) + self.assertNotIn("ORDER BY", captured_sql) def test_negated_empty_exists(self): manager = Manager.objects.create() - qs = Manager.objects.filter( - ~Exists(Manager.objects.none()) & Q(pk=manager.pk) - ) + qs = Manager.objects.filter(~Exists(Manager.objects.none()) & Q(pk=manager.pk)) self.assertSequenceEqual(qs, [manager]) def test_select_negated_empty_exists(self): @@ -1928,13 +2209,12 @@ class ExistsTests(TestCase): class FieldTransformTests(TestCase): - @classmethod def setUpTestData(cls): cls.sday = sday = datetime.date(2010, 6, 25) cls.stime = stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000) cls.ex1 = Experiment.objects.create( - name='Experiment 1', + name="Experiment 1", assigned=sday, completed=sday + datetime.timedelta(2), estimated_time=datetime.timedelta(2), @@ -1944,96 +2224,118 @@ class FieldTransformTests(TestCase): def test_month_aggregation(self): self.assertEqual( - Experiment.objects.aggregate(month_count=Count('assigned__month')), - {'month_count': 1} + Experiment.objects.aggregate(month_count=Count("assigned__month")), + {"month_count": 1}, ) def test_transform_in_values(self): self.assertSequenceEqual( - Experiment.objects.values('assigned__month'), - [{'assigned__month': 6}], + Experiment.objects.values("assigned__month"), + [{"assigned__month": 6}], ) def test_multiple_transforms_in_values(self): self.assertSequenceEqual( - Experiment.objects.values('end__date__month'), - [{'end__date__month': 6}], + Experiment.objects.values("end__date__month"), + [{"end__date__month": 6}], ) class ReprTests(SimpleTestCase): - def test_expressions(self): self.assertEqual( repr(Case(When(a=1))), - "<Case: CASE WHEN <Q: (AND: ('a', 1))> THEN Value(None), ELSE Value(None)>" + "<Case: CASE WHEN <Q: (AND: ('a', 1))> THEN Value(None), ELSE Value(None)>", + ) + self.assertEqual( + repr(When(Q(age__gte=18), then=Value("legal"))), + "<When: WHEN <Q: (AND: ('age__gte', 18))> THEN Value('legal')>", + ) + self.assertEqual(repr(Col("alias", "field")), "Col(alias, field)") + self.assertEqual(repr(F("published")), "F(published)") + self.assertEqual( + repr(F("cost") + F("tax")), "<CombinedExpression: F(cost) + F(tax)>" ) self.assertEqual( - repr(When(Q(age__gte=18), then=Value('legal'))), - "<When: WHEN <Q: (AND: ('age__gte', 18))> THEN Value('legal')>" + repr(ExpressionWrapper(F("cost") + F("tax"), IntegerField())), + "ExpressionWrapper(F(cost) + F(tax))", ) - self.assertEqual(repr(Col('alias', 'field')), "Col(alias, field)") - self.assertEqual(repr(F('published')), "F(published)") - self.assertEqual(repr(F('cost') + F('tax')), "<CombinedExpression: F(cost) + F(tax)>") self.assertEqual( - repr(ExpressionWrapper(F('cost') + F('tax'), IntegerField())), - "ExpressionWrapper(F(cost) + F(tax))" + repr(Func("published", function="TO_CHAR")), + "Func(F(published), function=TO_CHAR)", + ) + self.assertEqual(repr(OrderBy(Value(1))), "OrderBy(Value(1), descending=False)") + self.assertEqual(repr(RawSQL("table.col", [])), "RawSQL(table.col, [])") + self.assertEqual( + repr(Ref("sum_cost", Sum("cost"))), "Ref(sum_cost, Sum(F(cost)))" ) - self.assertEqual(repr(Func('published', function='TO_CHAR')), "Func(F(published), function=TO_CHAR)") - self.assertEqual(repr(OrderBy(Value(1))), 'OrderBy(Value(1), descending=False)') - self.assertEqual(repr(RawSQL('table.col', [])), "RawSQL(table.col, [])") - self.assertEqual(repr(Ref('sum_cost', Sum('cost'))), "Ref(sum_cost, Sum(F(cost)))") self.assertEqual(repr(Value(1)), "Value(1)") self.assertEqual( - repr(ExpressionList(F('col'), F('anothercol'))), - 'ExpressionList(F(col), F(anothercol))' + repr(ExpressionList(F("col"), F("anothercol"))), + "ExpressionList(F(col), F(anothercol))", ) self.assertEqual( - repr(ExpressionList(OrderBy(F('col'), descending=False))), - 'ExpressionList(OrderBy(F(col), descending=False))' + repr(ExpressionList(OrderBy(F("col"), descending=False))), + "ExpressionList(OrderBy(F(col), descending=False))", ) def test_functions(self): - self.assertEqual(repr(Coalesce('a', 'b')), "Coalesce(F(a), F(b))") - self.assertEqual(repr(Concat('a', 'b')), "Concat(ConcatPair(F(a), F(b)))") - self.assertEqual(repr(Length('a')), "Length(F(a))") - self.assertEqual(repr(Lower('a')), "Lower(F(a))") - self.assertEqual(repr(Substr('a', 1, 3)), "Substr(F(a), Value(1), Value(3))") - self.assertEqual(repr(Upper('a')), "Upper(F(a))") + self.assertEqual(repr(Coalesce("a", "b")), "Coalesce(F(a), F(b))") + self.assertEqual(repr(Concat("a", "b")), "Concat(ConcatPair(F(a), F(b)))") + self.assertEqual(repr(Length("a")), "Length(F(a))") + self.assertEqual(repr(Lower("a")), "Lower(F(a))") + self.assertEqual(repr(Substr("a", 1, 3)), "Substr(F(a), Value(1), Value(3))") + self.assertEqual(repr(Upper("a")), "Upper(F(a))") def test_aggregates(self): - self.assertEqual(repr(Avg('a')), "Avg(F(a))") - self.assertEqual(repr(Count('a')), "Count(F(a))") - self.assertEqual(repr(Count('*')), "Count('*')") - self.assertEqual(repr(Max('a')), "Max(F(a))") - self.assertEqual(repr(Min('a')), "Min(F(a))") - self.assertEqual(repr(StdDev('a')), "StdDev(F(a), sample=False)") - self.assertEqual(repr(Sum('a')), "Sum(F(a))") - self.assertEqual(repr(Variance('a', sample=True)), "Variance(F(a), sample=True)") + self.assertEqual(repr(Avg("a")), "Avg(F(a))") + self.assertEqual(repr(Count("a")), "Count(F(a))") + self.assertEqual(repr(Count("*")), "Count('*')") + self.assertEqual(repr(Max("a")), "Max(F(a))") + self.assertEqual(repr(Min("a")), "Min(F(a))") + self.assertEqual(repr(StdDev("a")), "StdDev(F(a), sample=False)") + self.assertEqual(repr(Sum("a")), "Sum(F(a))") + self.assertEqual( + repr(Variance("a", sample=True)), "Variance(F(a), sample=True)" + ) def test_distinct_aggregates(self): - self.assertEqual(repr(Count('a', distinct=True)), "Count(F(a), distinct=True)") - self.assertEqual(repr(Count('*', distinct=True)), "Count('*', distinct=True)") + self.assertEqual(repr(Count("a", distinct=True)), "Count(F(a), distinct=True)") + self.assertEqual(repr(Count("*", distinct=True)), "Count('*', distinct=True)") def test_filtered_aggregates(self): filter = Q(a=1) - self.assertEqual(repr(Avg('a', filter=filter)), "Avg(F(a), filter=(AND: ('a', 1)))") - self.assertEqual(repr(Count('a', filter=filter)), "Count(F(a), filter=(AND: ('a', 1)))") - self.assertEqual(repr(Max('a', filter=filter)), "Max(F(a), filter=(AND: ('a', 1)))") - self.assertEqual(repr(Min('a', filter=filter)), "Min(F(a), filter=(AND: ('a', 1)))") - self.assertEqual(repr(StdDev('a', filter=filter)), "StdDev(F(a), filter=(AND: ('a', 1)), sample=False)") - self.assertEqual(repr(Sum('a', filter=filter)), "Sum(F(a), filter=(AND: ('a', 1)))") self.assertEqual( - repr(Variance('a', sample=True, filter=filter)), - "Variance(F(a), filter=(AND: ('a', 1)), sample=True)" + repr(Avg("a", filter=filter)), "Avg(F(a), filter=(AND: ('a', 1)))" + ) + self.assertEqual( + repr(Count("a", filter=filter)), "Count(F(a), filter=(AND: ('a', 1)))" + ) + self.assertEqual( + repr(Max("a", filter=filter)), "Max(F(a), filter=(AND: ('a', 1)))" + ) + self.assertEqual( + repr(Min("a", filter=filter)), "Min(F(a), filter=(AND: ('a', 1)))" + ) + self.assertEqual( + repr(StdDev("a", filter=filter)), + "StdDev(F(a), filter=(AND: ('a', 1)), sample=False)", + ) + self.assertEqual( + repr(Sum("a", filter=filter)), "Sum(F(a), filter=(AND: ('a', 1)))" + ) + self.assertEqual( + repr(Variance("a", sample=True, filter=filter)), + "Variance(F(a), filter=(AND: ('a', 1)), sample=True)", ) self.assertEqual( - repr(Count('a', filter=filter, distinct=True)), "Count(F(a), distinct=True, filter=(AND: ('a', 1)))" + repr(Count("a", filter=filter, distinct=True)), + "Count(F(a), distinct=True, filter=(AND: ('a', 1)))", ) class CombinableTests(SimpleTestCase): - bitwise_msg = 'Use .bitand() and .bitor() for bitwise logical operations.' + bitwise_msg = "Use .bitand() and .bitor() for bitwise logical operations." def test_negation(self): c = Combinable() @@ -2069,7 +2371,9 @@ class CombinedExpressionTests(SimpleTestCase): connectors = [Combinable.ADD, Combinable.SUB, Combinable.MUL, Combinable.DIV] for lhs, rhs, combined in tests: for connector in connectors: - with self.subTest(lhs=lhs, connector=connector, rhs=rhs, combined=combined): + with self.subTest( + lhs=lhs, connector=connector, rhs=rhs, combined=combined + ): expr = CombinedExpression( Expression(lhs()), connector, @@ -2084,7 +2388,7 @@ class ExpressionWrapperTests(SimpleTestCase): self.assertEqual(expr.get_group_by_cols(alias=None), []) def test_non_empty_group_by(self): - value = Value('f') + value = Value("f") value.output_field = None expr = ExpressionWrapper(Lower(value), output_field=IntegerField()) group_by_cols = expr.get_group_by_cols(alias=None) @@ -2095,20 +2399,20 @@ class ExpressionWrapperTests(SimpleTestCase): class OrderByTests(SimpleTestCase): def test_equal(self): self.assertEqual( - OrderBy(F('field'), nulls_last=True), - OrderBy(F('field'), nulls_last=True), + OrderBy(F("field"), nulls_last=True), + OrderBy(F("field"), nulls_last=True), ) self.assertNotEqual( - OrderBy(F('field'), nulls_last=True), - OrderBy(F('field'), nulls_last=False), + OrderBy(F("field"), nulls_last=True), + OrderBy(F("field"), nulls_last=False), ) def test_hash(self): self.assertEqual( - hash(OrderBy(F('field'), nulls_last=True)), - hash(OrderBy(F('field'), nulls_last=True)), + hash(OrderBy(F("field"), nulls_last=True)), + hash(OrderBy(F("field"), nulls_last=True)), ) self.assertNotEqual( - hash(OrderBy(F('field'), nulls_last=True)), - hash(OrderBy(F('field'), nulls_last=False)), + hash(OrderBy(F("field"), nulls_last=True)), + hash(OrderBy(F("field"), nulls_last=False)), ) |