summaryrefslogtreecommitdiff
path: root/tests/distinct_on_fields/tests.py
blob: f62a32e58d5d3cd25c80b66daff38572b0b943e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from __future__ import absolute_import

from django.db.models import Max
from django.test import TestCase, skipUnlessDBFeature
from django.test.utils import str_prefix

from .models import Tag, Celebrity, Fan, Staff, StaffTag

class DistinctOnTests(TestCase):
    def setUp(self):
        t1 = Tag.objects.create(name='t1')
        t2 = Tag.objects.create(name='t2', parent=t1)
        t3 = Tag.objects.create(name='t3', parent=t1)
        t4 = Tag.objects.create(name='t4', parent=t3)
        t5 = Tag.objects.create(name='t5', parent=t3)

        p1_o1 = Staff.objects.create(id=1, name="p1", organisation="o1")
        p2_o1 = Staff.objects.create(id=2, name="p2", organisation="o1")
        p3_o1 = Staff.objects.create(id=3, name="p3", organisation="o1")
        p1_o2 = Staff.objects.create(id=4, name="p1", organisation="o2")
        p1_o1.coworkers.add(p2_o1, p3_o1)
        StaffTag.objects.create(staff=p1_o1, tag=t1)
        StaffTag.objects.create(staff=p1_o1, tag=t1)

        celeb1 = Celebrity.objects.create(name="c1")
        celeb2 = Celebrity.objects.create(name="c2")

        self.fan1 = Fan.objects.create(fan_of=celeb1)
        self.fan2 = Fan.objects.create(fan_of=celeb1)
        self.fan3 = Fan.objects.create(fan_of=celeb2)

    @skipUnlessDBFeature('can_distinct_on_fields')
    def test_basic_distinct_on(self):
        """QuerySet.distinct('field', ...) works"""
        # (qset, expected) tuples
        qsets = (
            (
                Staff.objects.distinct().order_by('name'),
                ['<Staff: p1>', '<Staff: p1>', '<Staff: p2>', '<Staff: p3>'],
            ),
            (
                Staff.objects.distinct('name').order_by('name'),
                ['<Staff: p1>', '<Staff: p2>', '<Staff: p3>'],
            ),
            (
                Staff.objects.distinct('organisation').order_by('organisation', 'name'),
                ['<Staff: p1>', '<Staff: p1>'],
            ),
            (
                Staff.objects.distinct('name', 'organisation').order_by('name', 'organisation'),
                ['<Staff: p1>', '<Staff: p1>', '<Staff: p2>', '<Staff: p3>'],
            ),
            (
                Celebrity.objects.filter(fan__in=[self.fan1, self.fan2, self.fan3]).\
                    distinct('name').order_by('name'),
                ['<Celebrity: c1>', '<Celebrity: c2>'],
            ),
            # Does combining querysets work?
            (
                (Celebrity.objects.filter(fan__in=[self.fan1, self.fan2]).\
                    distinct('name').order_by('name')
                |Celebrity.objects.filter(fan__in=[self.fan3]).\
                    distinct('name').order_by('name')),
                ['<Celebrity: c1>', '<Celebrity: c2>'],
            ),
            (
                StaffTag.objects.distinct('staff','tag'),
                ['<StaffTag: t1 -> p1>'],
            ),
            (
                Tag.objects.order_by('parent__pk', 'pk').distinct('parent'),
                ['<Tag: t2>', '<Tag: t4>', '<Tag: t1>'],
            ),
            (
                StaffTag.objects.select_related('staff').distinct('staff__name').order_by('staff__name'),
                ['<StaffTag: t1 -> p1>'],
            ),
            # Fetch the alphabetically first coworker for each worker
            (
                (Staff.objects.distinct('id').order_by('id', 'coworkers__name').
                               values_list('id', 'coworkers__name')),
                [str_prefix("(1, %(_)s'p2')"), str_prefix("(2, %(_)s'p1')"),
                 str_prefix("(3, %(_)s'p1')"), "(4, None)"]
            ),
        )
        for qset, expected in qsets:
            self.assertQuerysetEqual(qset, expected)
            self.assertEqual(qset.count(), len(expected))

        # Combining queries with different distinct_fields is not allowed.
        base_qs = Celebrity.objects.all()
        self.assertRaisesMessage(
            AssertionError,
            "Cannot combine queries with different distinct fields.",
            lambda: (base_qs.distinct('id') & base_qs.distinct('name'))
        )

        # Test join unreffing
        c1 = Celebrity.objects.distinct('greatest_fan__id', 'greatest_fan__fan_of')
        self.assertIn('OUTER JOIN', str(c1.query))
        c2 = c1.distinct('pk')
        self.assertNotIn('OUTER JOIN', str(c2.query))

    @skipUnlessDBFeature('can_distinct_on_fields')
    def test_distinct_not_implemented_checks(self):
        # distinct + annotate not allowed
        with self.assertRaises(NotImplementedError):
            Celebrity.objects.annotate(Max('id')).distinct('id')[0]
        with self.assertRaises(NotImplementedError):
            Celebrity.objects.distinct('id').annotate(Max('id'))[0]

        # However this check is done only when the query executes, so you
        # can use distinct() to remove the fields before execution.
        Celebrity.objects.distinct('id').annotate(Max('id')).distinct()[0]
        # distinct + aggregate not allowed
        with self.assertRaises(NotImplementedError):
            Celebrity.objects.distinct('id').aggregate(Max('id'))