summaryrefslogtreecommitdiff
path: root/django/contrib/gis/db/models/sql/where.py
blob: 0b8101300cb330e9e46c946d5b7f368cb1abb93a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from django.db import connection
from django.db.models.fields import Field, FieldDoesNotExist
from django.db.models.sql.constants import LOOKUP_SEP
from django.db.models.sql.expressions import SQLEvaluator
from django.db.models.sql.where import WhereNode
from django.contrib.gis.db.backend import get_geo_where_clause, SpatialBackend
from django.contrib.gis.db.models.fields import GeometryField
qn = connection.ops.quote_name

class GeoAnnotation(object):
    """
    The annotation used for GeometryFields; basically a placeholder
    for metadata needed by the `get_geo_where_clause` of the spatial
    backend.
    """
    def __init__(self, field, value, where):
        self.geodetic = field.geodetic
        self.geom_type = field.geom_type
        self.value = value
        self.where = tuple(where)

class GeoWhereNode(WhereNode):
    """
    Used to represent the SQL where-clause for spatial databases --
    these are tied to the GeoQuery class that created it.
    """
    def add(self, data, connector):
        """
        This is overridden from the regular WhereNode to handle the 
        peculiarties of GeometryFields, because they need a special 
        annotation object that contains the spatial metadata from the 
        field to generate the spatial SQL.
        """
        if not isinstance(data, (list, tuple)):
            return super(WhereNode, self).add(data, connector)

        obj, lookup_type, value = data
        col, field = obj.col, obj.field

        if not hasattr(field, "geom_type"):
            # Not a geographic field, so call `WhereNode.add`.
            return super(GeoWhereNode, self).add(data, connector)
        else:
            if isinstance(value, SQLEvaluator):
                # Getting the geographic field to compare with from the expression.
                geo_fld = self._check_geo_field(value.opts, value.expression.name)
                if not geo_fld:
                    raise ValueError('No geographic field found in expression.')

                # Get the SRID of the geometry field that the expression was meant 
                # to operate on -- it's needed to determine whether transformation 
                # SQL is necessary.
                srid = geo_fld.srid

                # Getting the quoted representation of the geometry column that
                # the expression is operating on.
                geo_col = '%s.%s' % tuple(map(qn, value.cols[value.expression]))

                # If it's in a different SRID, we'll need to wrap in 
                # transformation SQL.
                if not srid is None and srid != field.srid and SpatialBackend.transform:
                    placeholder = '%s(%%s, %s)' % (SpatialBackend.transform, field.srid)
                else:
                    placeholder = '%s'

                # Setting these up as if we had called `field.get_db_prep_lookup()`.
                where =  [placeholder % geo_col]
                params = ()
            else:
                # `GeometryField.get_db_prep_lookup` returns a where clause
                # substitution array in addition to the parameters.
                where, params = field.get_db_prep_lookup(lookup_type, value)

            # The annotation will be a `GeoAnnotation` object that
            # will contain the necessary geometry field metadata for
            # the `get_geo_where_clause` to construct the appropriate
            # spatial SQL when `make_atom` is called.
            annotation = GeoAnnotation(field, value, where)
            return super(WhereNode, self).add(((obj.alias, col, field.db_type()), lookup_type, annotation, params), connector)

    def make_atom(self, child, qn, connection):
        obj, lookup_type, value_annot, params = child

        if isinstance(value_annot, GeoAnnotation):
            if lookup_type in SpatialBackend.gis_terms:
                # Getting the geographic where clause; substitution parameters
                # will be populated in the GeoFieldSQL object returned by the
                # GeometryField.
                alias, col, db_type = obj
                gwc = get_geo_where_clause(alias, col, lookup_type, value_annot)
                return gwc % value_annot.where, params
            else:
                raise TypeError('Invalid lookup type: %r' % lookup_type)
        else:
            # If not a GeometryField, call the `make_atom` from the 
            # base class.
            return super(GeoWhereNode, self).make_atom(child, qn, connection)

    @classmethod
    def _check_geo_field(cls, opts, lookup):
        """
        Utility for checking the given lookup with the given model options.  
        The lookup is a string either specifying the geographic field, e.g.
        'point, 'the_geom', or a related lookup on a geographic field like
        'address__point'.

        If a GeometryField exists according to the given lookup on the model
        options, it will be returned.  Otherwise returns None.
        """
        # This takes into account the situation where the lookup is a
        # lookup to a related geographic field, e.g., 'address__point'.
        field_list = lookup.split(LOOKUP_SEP)

        # Reversing so list operates like a queue of related lookups,
        # and popping the top lookup.
        field_list.reverse()
        fld_name = field_list.pop()

        try:
            geo_fld = opts.get_field(fld_name)
            # If the field list is still around, then it means that the
            # lookup was for a geometry field across a relationship --
            # thus we keep on getting the related model options and the
            # model field associated with the next field in the list 
            # until there's no more left.
            while len(field_list):
                opts = geo_fld.rel.to._meta
                geo_fld = opts.get_field(field_list.pop())
        except (FieldDoesNotExist, AttributeError):
            return False

        # Finally, make sure we got a Geographic field and return.
        if isinstance(geo_fld, GeometryField):
            return geo_fld
        else:
            return False