diff options
| -rw-r--r-- | NEWS | 1 | ||||
| -rw-r--r-- | doc/src/extras.rst | 83 | ||||
| -rw-r--r-- | lib/_range.py | 468 | ||||
| -rw-r--r-- | lib/extras.py | 7 | ||||
| -rwxr-xr-x | tests/test_types_extras.py | 493 |
5 files changed, 1051 insertions, 1 deletions
@@ -2,6 +2,7 @@ What's new in psycopg 2.4.6 --------------------------- - Added JSON adaptation. + - Added support for PostgreSQL 9.2 range types. - Added support for backward scrollable cursors. Thanks to Jon Nelson for the initial patch (ticket #108). - Added a simple way to customize casting of composite types into Python diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 6f5bf22..fcb5a18 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -374,6 +374,89 @@ requires no adapter registration. .. index:: + pair: range; Data types + +Range data types +^^^^^^^^^^^^^^^^ + +.. versionadded:: 2.4.6 + +Psycopg offers a `Range` Python type and supports adaptation between them and +PostgreSQL |range|_ types. Builtin |range| types are supported out-of-the-box; +user-defined |range| types can be adapted using `register_range()`. + +.. |range| replace:: :sql:`range` +.. _range: http://www.postgresql.org/docs/current/static/rangetypes.html + +.. autoclass:: Range + + This Python type is only used to pass and retrieve range values to and + from PostgreSQL and doesn't attempt to replicate the PostgreSQL range + features: it doesn't perform normalization and doesn't implement all the + operators__ supported by the database. + + .. __: http://www.postgresql.org/docs/current/static/functions-range.html#RANGE-OPERATORS-TABLE + + `!Range` objects are immutable, hashable, and support the ``in`` operator + (checking if an element is within the range). They can be tested for + equivalence but not for ordering. Empty ranges evaluate to `!False` in + boolean context, nonempty evaluate to `!True`. + + Although it is possible to instantiate `!Range` objects, the class doesn't + have an adapter registered, so you cannot normally pass these instances as + query arguments. To use range objects as query arguments you can either + use one of the provided subclasses, such as `NumericRange` or create a + custom subclass using `register_range()`. + + Object attributes: + + .. autoattribute:: isempty + .. autoattribute:: lower + .. autoattribute:: upper + .. autoattribute:: lower_inc + .. autoattribute:: upper_inc + .. autoattribute:: lower_inf + .. autoattribute:: upper_inf + + +The following `Range` subclasses map builtin PostgreSQL |range| types to +Python objects: they have an adapter registered so their instances can be +passed as query arguments. |range| values read from database queries are +automatically casted into instances of these classes. + +.. autoclass:: NumericRange +.. autoclass:: DateRange +.. autoclass:: DateTimeRange +.. autoclass:: DateTimeTZRange + +Custom |range| types (created with |CREATE TYPE|_ :sql:`... AS RANGE`) can be +adapted to a custom `Range` subclass: + +.. autofunction:: register_range + +.. autoclass:: RangeCaster + + Object attributes: + + .. attribute:: range + + The `!Range` subclass adapted. + + .. attribute:: adapter + + The `~psycopg2.extensions.ISQLQuote` responsible to adapt `!range`. + + .. attribute:: typecaster + + The object responsible for casting. + + .. attribute:: array_typecaster + + The object responsible to cast arrays, if available, else `!None`. + + + +.. index:: pair: UUID; Data types UUID data type diff --git a/lib/_range.py b/lib/_range.py new file mode 100644 index 0000000..0f99c27 --- /dev/null +++ b/lib/_range.py @@ -0,0 +1,468 @@ +"""Implementation of the Range type and adaptation + +""" + +# psycopg/_range.py - Implementation of the Range type and adaptation +# +# Copyright (C) 2012 Daniele Varrazzo <daniele.varrazzo@gmail.com> +# +# psycopg2 is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# In addition, as a special exception, the copyright holders give +# permission to link this program with the OpenSSL library (or with +# modified versions of OpenSSL that use the same license as OpenSSL), +# and distribute linked combinations including the two. +# +# You must obey the GNU Lesser General Public License in all respects for +# all of the code used other than OpenSSL. +# +# psycopg2 is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +import re + +from psycopg2._psycopg import ProgrammingError, InterfaceError +from psycopg2.extensions import ISQLQuote, adapt, register_adapter +from psycopg2.extensions import new_type, new_array_type, register_type + +class Range(object): + """Python representation for a PostgreSQL |range|_ type. + + :param lower: lower bound for the range. `!None` means unbound + :param upper: upper bound for the range. `!None` means unbound + :param bounds: one of the literal strings ``()``, ``[)``, ``(]``, ``[]``, + representing whether the lower or upper bounds are included + :param empty: if `!True`, the range is empty + + """ + __slots__ = ('_lower', '_upper', '_bounds') + + def __init__(self, lower=None, upper=None, bounds='[)', empty=False): + if not empty: + if bounds not in ('[)', '(]', '()', '[]'): + raise ValueError("bound flags not valid: %r" % bounds) + + self._lower = lower + self._upper = upper + self._bounds = bounds + else: + self._lower = self._upper = self._bounds = None + + def __repr__(self): + if self._bounds is None: + return "%s(empty=True)" % self.__class__.__name__ + else: + return "%s(%r, %r, %r)" % (self.__class__.__name__, + self._lower, self._upper, self._bounds) + + @property + def lower(self): + """The lower bound of the range. `!None` if empty or unbound.""" + return self._lower + + @property + def upper(self): + """The upper bound of the range. `!None` if empty or unbound.""" + return self._upper + + @property + def isempty(self): + """`!True` if the range is empty.""" + return self._bounds is None + + @property + def lower_inf(self): + """`!True` if the range doesn't have a lower bound.""" + if self._bounds is None: return False + return self._lower is None + + @property + def upper_inf(self): + """`!True` if the range doesn't have an upper bound.""" + if self._bounds is None: return False + return self._upper is None + + @property + def lower_inc(self): + """`!True` if the lower bound is included in the range.""" + if self._bounds is None: return False + if self._lower is None: return False + return self._bounds[0] == '[' + + @property + def upper_inc(self): + """`!True` if the upper bound is included in the range.""" + if self._bounds is None: return False + if self._upper is None: return False + return self._bounds[1] == ']' + + def __contains__(self, x): + if self._bounds is None: return False + if self._lower is not None: + if self._bounds[0] == '[': + if x < self._lower: return False + else: + if x <= self._lower: return False + + if self._upper is not None: + if self._bounds[1] == ']': + if x > self._upper: return False + else: + if x >= self._upper: return False + + return True + + def __nonzero__(self): + return self._bounds is not None + + def __eq__(self, other): + return (self._lower == other._lower + and self._upper == other._upper + and self._bounds == other._bounds) + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash((self._lower, self._upper, self._bounds)) + + def __lt__(self, other): + raise TypeError( + 'Range objects cannot be ordered; please refer to the PostgreSQL' + ' documentation to perform this operation in the database') + + __le__ = __gt__ = __ge__ = __lt__ + + +def register_range(pgrange, pyrange, conn_or_curs, globally=False): + """Create and register an adapter and the typecasters to convert between + a PostgreSQL |range|_ type and a PostgreSQL `Range` subclass. + + :param pgrange: the name of the PostgreSQL |range| type. Can be + schema-qualified + :param pyrange: a `Range` strict subclass, or just a name to give to a new + class + :param conn_or_curs: a connection or cursor used to find the oid of the + range and its subtype; the typecaster is registered in a scope limited + to this object, unless *globally* is set to `!True` + :param globally: if `!False` (default) register the typecaster only on + *conn_or_curs*, otherwise register it globally + :return: `RangeCaster` instance responsible for the conversion + + If a string is passed to *pyrange*, a new `Range` subclass is created + with such name and will be available as the `~RangeCaster.range` attribute + of the returned `RangeCaster` object. + + The function queries the database on *conn_or_curs* to inspect the + *pgrange* type. Raise `~psycopg2.ProgrammingError` if the type is not + found. If querying the database is not advisable, use directly the + `RangeCaster` class and register the adapter and typecasters using the + provided functions. + + """ + caster = RangeCaster._from_db(pgrange, pyrange, conn_or_curs) + caster._register(not globally and conn_or_curs or None) + return caster + + +class RangeAdapter(object): + """`ISQLQuote` adapter for `Range` subclasses. + + This is an abstract class: concrete classes must set a `name` class + attribute or override `getquoted()`. + """ + name = None + + def __init__(self, adapted): + self.adapted = adapted + + def __conform__(self, proto): + if self._proto is ISQLQuote: + return self + + def prepare(self, conn): + self._conn = conn + + def getquoted(self): + if self.name is None: + raise NotImplementedError( + 'RangeAdapter must be subclassed overriding its name ' + 'or the getquoted() method') + + r = self.adapted + if r.isempty: + return "'empty'::%s" % self.name + + if r.lower is not None: + a = adapt(r.lower) + if hasattr(a, 'prepare'): + a.prepare(self._conn) + lower = a.getquoted() + else: + lower = 'NULL' + + if r.upper is not None: + a = adapt(r.upper) + if hasattr(a, 'prepare'): + a.prepare(self._conn) + upper = a.getquoted() + else: + upper = 'NULL' + + return "%s(%s, %s, '%s')" % ( + self.name, lower, upper, r._bounds) + + +class RangeCaster(object): + """Helper class to convert between `Range` and PostgreSQL range types. + + Objects of this class are usually created by `register_range()`. Manual + creation could be useful if querying the database is not advisable: in + this case the oids must be provided. + """ + def __init__(self, pgrange, pyrange, oid, subtype_oid, array_oid=None): + self.subtype_oid = subtype_oid + self._create_ranges(pgrange, pyrange) + + name = self.adapter.name or self.adapter.__class__.__name__ + + self.typecaster = new_type((oid,), name, self.parse) + + if array_oid is not None: + self.array_typecaster = new_array_type( + (array_oid,), name + "ARRAY", self.typecaster) + else: + self.array_typecaster = None + + def _create_ranges(self, pgrange, pyrange): + """Create Range and RangeAdapter classes if needed.""" + # if got a string create a new RangeAdapter concrete type (with a name) + # else take it as an adapter. Passing an adapter should be considered + # an implementation detail and is not documented. It is currently used + # for the numeric ranges. + self.adapter = None + if isinstance(pgrange, basestring): + self.adapter = type(pgrange, (RangeAdapter,), {}) + self.adapter.name = pgrange + else: + try: + if issubclass(pgrange, RangeAdapter) and pgrange is not RangeAdapter: + self.adapter = pgrange + except TypeError: + pass + + if self.adapter is None: + raise TypeError( + 'pgrange must be a string or a RangeAdapter strict subclass') + + self.range = None + try: + if isinstance(pyrange, basestring): + self.range = type(pyrange, (Range,), {}) + if issubclass(pyrange, Range) and pyrange is not Range: + self.range = pyrange + except TypeError: + pass + + if self.range is None: + raise TypeError( + 'pyrange must be a type or a Range strict subclass') + + @classmethod + def _from_db(self, name, pyrange, conn_or_curs): + """Return a `RangeCaster` instance for the type *pgrange*. + + Raise `ProgrammingError` if the type is not found. + """ + from psycopg2.extensions import STATUS_IN_TRANSACTION + from psycopg2.extras import _solve_conn_curs + conn, curs = _solve_conn_curs(conn_or_curs) + + if conn.server_version < 90200: + raise ProgrammingError("range types not available in version %s" + % conn.server_version) + + # Store the transaction status of the connection to revert it after use + conn_status = conn.status + + # Use the correct schema + if '.' in name: + schema, tname = name.split('.', 1) + else: + tname = name + schema = 'public' + + # get the type oid and attributes + try: + curs.execute("""\ +select rngtypid, rngsubtype, + (select typarray from pg_type where oid = rngtypid) +from pg_range r +join pg_type t on t.oid = rngtypid +join pg_namespace ns on ns.oid = typnamespace +where typname = %s and ns.nspname = %s; +""", (tname, schema)) + + except ProgrammingError: + if not conn.autocommit: + conn.rollback() + raise + else: + rec = curs.fetchone() + + # revert the status of the connection as before the command + if (conn_status != STATUS_IN_TRANSACTION + and not conn.autocommit): + conn.rollback() + + if not rec: + raise ProgrammingError( + "PostgreSQL type '%s' not found" % name) + + type, subtype, array = rec + + return RangeCaster(name, pyrange, + oid=type, subtype_oid=subtype, array_oid=array) + + _re_range = re.compile(r""" + ( \(|\[ ) # lower bound flag + (?: # lower bound: + " ( (?: [^"] | "")* ) " # - a quoted string + | ( [^",]+ ) # - or an unquoted string + )? # - or empty (not catched) + , + (?: # upper bound: + " ( (?: [^"] | "")* ) " # - a quoted string + | ( [^"\)\]]+ ) # - or an unquoted string + )? # - or empty (not catched) + ( \)|\] ) # upper bound flag + """, re.VERBOSE) + + _re_undouble = re.compile(r'(["\\])\1') + + def parse(self, s, cur=None): + if s is None: + return None + + if s == 'empty': + return self.range(empty=True) + + m = self._re_range.match(s) + if m is None: + raise InterfaceError("failed to parse range: %s") + + lower = m.group(3) + if lower is None: + lower = m.group(2) + if lower is not None: + lower = self._re_undouble.sub(r"\1", lower) + + upper = m.group(5) + if upper is None: + upper = m.group(4) + if upper is not None: + upper = self._re_undouble.sub(r"\1", upper) + + if cur is not None: + lower = cur.cast(self.subtype_oid, lower) + upper = cur.cast(self.subtype_oid, upper) + + bounds = m.group(1) + m.group(6) + + return self.range(lower, upper, bounds) + + def _register(self, scope=None): + register_type(self.typecaster, scope) + if self.array_typecaster is not None: + register_type(self.array_typecaster, scope) + + register_adapter(self.range, self.adapter) + + +class NumericRange(Range): + """A `Range` suitable to pass Python numeric types to a PostgreSQL range. + + PostgreSQL types :sql:`int4range`, :sql:`int8range`, :sql:`numrange` are + casted into `!NumericRange` instances. + """ + pass + +class DateRange(Range): + """Represents :sql:`daterange` values.""" + pass + +class DateTimeRange(Range): + """Represents :sql:`tsrange` values.""" + pass + +class DateTimeTZRange(Range): + """Represents :sql:`tstzrange` values.""" + pass + + +# Special adaptation for NumericRange. Allows to pass number range regardless +# of whether they are ints, floats and what size of ints are, which are +# pointless in Python world. On the way back, no numeric range is casted to +# NumericRange, but only to their subclasses + +class NumberRangeAdapter(RangeAdapter): + """Adapt a range if the subtype doesn't need quotes.""" + def getquoted(self): + r = self.adapted + if r.isempty: + return "'empty'" + + if not r.lower_inf: + # not exactly: we are relying that none of these object is really + # quoted (they are numbers). Also, I'm lazy and not preparing the + # adapter because I assume encoding doesn't matter for these + # objects. + lower = adapt(r.lower).getquoted() + else: + lower = '' + + if not r.upper_inf: + upper = adapt(r.upper).getquoted() + else: + upper = '' + + return "'%s%s,%s%s'" % ( + r._bounds[0], lower, upper, r._bounds[1]) + +# TODO: probably won't work with infs, nans and other tricky cases. +register_adapter(NumericRange, NumberRangeAdapter) + + +# Register globally typecasters and adapters for builtin range types. + +# note: the adapter is registered more than once, but this is harmless. +int4range_caster = RangeCaster(NumberRangeAdapter, NumericRange, + oid=3904, subtype_oid=23, array_oid=3905) +int4range_caster._register() + +int8range_caster = RangeCaster(NumberRangeAdapter, NumericRange, + oid=3926, subtype_oid=20, array_oid=3927) +int8range_caster._register() + +numrange_caster = RangeCaster(NumberRangeAdapter, NumericRange, + oid=3906, subtype_oid=1700, array_oid=3907) +numrange_caster._register() + +daterange_caster = RangeCaster('daterange', DateRange, + oid=3912, subtype_oid=1082, array_oid=3913) +daterange_caster._register() + +tsrange_caster = RangeCaster('tsrange', DateTimeRange, + oid=3908, subtype_oid=1114, array_oid=3909) +tsrange_caster._register() + +tstzrange_caster = RangeCaster('tstzrange', DateTimeTZRange, + oid=3910, subtype_oid=1184, array_oid=3911) +tstzrange_caster._register() + + diff --git a/lib/extras.py b/lib/extras.py index fe8db0a..eed8b32 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -952,5 +952,12 @@ def register_composite(name, conn_or_curs, globally=False, factory=None): # expose the json adaptation stuff into the module from psycopg2._json import json, Json, register_json, register_default_json + +# Expose range-related objects +from psycopg2._range import Range, NumericRange +from psycopg2._range import DateRange, DateTimeRange, DateTimeTZRange +from psycopg2._range import register_range, RangeAdapter, RangeCaster + + __all__ = filter(lambda k: not k.startswith('_'), locals().keys()) diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index cc05a5f..559d8c9 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -17,9 +17,10 @@ import re import sys from decimal import Decimal -from datetime import date +from datetime import date, datetime from testutils import unittest, skip_if_no_uuid, skip_before_postgres +from testutils import decorate_all_tests import psycopg2 import psycopg2.extras @@ -1055,6 +1056,496 @@ class JsonTestCase(unittest.TestCase): self.assertEqual(data['b'], None) +class RangeTestCase(unittest.TestCase): + def test_noparam(self): + from psycopg2.extras import Range + r = Range() + + self.assert_(not r.isempty) + self.assertEqual(r.lower, None) + self.assertEqual(r.upper, None) + self.assert_(r.lower_inf) + self.assert_(r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + def test_empty(self): + from psycopg2.extras import Range + r = Range(empty=True) + + self.assert_(r.isempty) + self.assertEqual(r.lower, None) + self.assertEqual(r.upper, None) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + def test_nobounds(self): + from psycopg2.extras import Range + r = Range(10, 20) + self.assertEqual(r.lower, 10) + self.assertEqual(r.upper, 20) + self.assert_(not r.isempty) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(r.lower_inc) + self.assert_(not r.upper_inc) + + def test_bounds(self): + from psycopg2.extras import Range + for bounds, lower_inc, upper_inc in [ + ('[)', True, False), + ('(]', False, True), + ('()', False, False), + ('[]', True, True),]: + r = Range(10, 20, bounds) + self.assertEqual(r.lower, 10) + self.assertEqual(r.upper, 20) + self.assert_(not r.isempty) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assertEqual(r.lower_inc, lower_inc) + self.assertEqual(r.upper_inc, upper_inc) + + def test_keywords(self): + from psycopg2.extras import Range + r = Range(upper=20) + self.assertEqual(r.lower, None) + self.assertEqual(r.upper, 20) + self.assert_(not r.isempty) + self.assert_(r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + r = Range(lower=10, bounds='(]') + self.assertEqual(r.lower, 10) + self.assertEqual(r.upper, None) + self.assert_(not r.isempty) + self.assert_(not r.lower_inf) + self.assert_(r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + def test_bad_bounds(self): + from psycopg2.extras import Range + self.assertRaises(ValueError, Range, bounds='(') + self.assertRaises(ValueError, Range, bounds='[}') + + def test_in(self): + from psycopg2.extras import Range + r = Range(empty=True) + self.assert_(10 not in r) + + r = Range() + self.assert_(10 in r) + + r = Range(lower=10, bounds='[)') + self.assert_(9 not in r) + self.assert_(10 in r) + self.assert_(11 in r) + + r = Range(lower=10, bounds='()') + self.assert_(9 not in r) + self.assert_(10 not in r) + self.assert_(11 in r) + + r = Range(upper=20, bounds='()') + self.assert_(19 in r) + self.assert_(20 not in r) + self.assert_(21 not in r) + + r = Range(upper=20, bounds='(]') + self.assert_(19 in r) + self.assert_(20 in r) + self.assert_(21 not in r) + + r = Range(10, 20) + self.assert_(9 not in r) + self.assert_(10 in r) + self.assert_(11 in r) + self.assert_(19 in r) + self.assert_(20 not in r) + self.assert_(21 not in r) + + r = Range(10, 20, '(]') + self.assert_(9 not in r) + self.assert_(10 not in r) + self.assert_(11 in r) + self.assert_(19 in r) + self.assert_(20 in r) + self.assert_(21 not in r) + + r = Range(20, 10) + self.assert_(9 not in r) + self.assert_(10 not in r) + self.assert_(11 not in r) + self.assert_(19 not in r) + self.assert_(20 not in r) + self.assert_(21 not in r) + + def test_nonzero(self): + from psycopg2.extras import Range + self.assert_(Range()) + self.assert_(Range(10, 20)) + self.assert_(not Range(empty=True)) + + def test_eq_hash(self): + from psycopg2.extras import Range + def assert_equal(r1, r2): + self.assert_(r1 == r2) + self.assert_(hash(r1) == hash(r2)) + + assert_equal(Range(empty=True), Range(empty=True)) + assert_equal(Range(), Range()) + assert_equal(Range(10, None), Range(10, None)) + assert_equal(Range(10, 20), Range(10, 20)) + assert_equal(Range(10, 20), Range(10, 20, '[)')) + assert_equal(Range(10, 20, '[]'), Range(10, 20, '[]')) + + def assert_not_equal(r1, r2): + self.assert_(r1 != r2) + self.assert_(hash(r1) != hash(r2)) + + assert_not_equal(Range(10, 20), Range(10, 21)) + assert_not_equal(Range(10, 20), Range(11, 20)) + assert_not_equal(Range(10, 20, '[)'), Range(10, 20, '[]')) + + def test_not_ordered(self): + from psycopg2.extras import Range + self.assertRaises(TypeError, lambda: Range(empty=True) < Range(0,4)) + self.assertRaises(TypeError, lambda: Range(1,2) > Range(0,4)) + self.assertRaises(TypeError, lambda: Range(1,2) <= Range()) + self.assertRaises(TypeError, lambda: Range(1,2) >= Range()) + + +def skip_if_no_range(f): + def skip_if_no_range_(self): + if self.conn.server_version < 90200: + return self.skipTest( + "server version %s doesn't support range types" + % self.conn.server_version) + + return f(self) + + skip_if_no_range_.__name__ = f.__name__ + return skip_if_no_range_ + + +class RangeCasterTestCase(unittest.TestCase): + def setUp(self): + self.conn = psycopg2.connect(dsn) + + def tearDown(self): + self.conn.close() + + builtin_ranges = ('int4range', 'int8range', 'numrange', + 'daterange', 'tsrange', 'tstzrange') + + def test_cast_null(self): + cur = self.conn.cursor() + for type in self.builtin_ranges: + cur.execute("select NULL::%s" % type) + r = cur.fetchone()[0] + self.assertEqual(r, None) + + def test_cast_empty(self): + from psycopg2.extras import Range + cur = self.conn.cursor() + for type in self.builtin_ranges: + cur.execute("select 'empty'::%s" % type) + r = cur.fetchone()[0] + self.assert_(isinstance(r, Range), type) + self.assert_(r.isempty) + + def test_cast_inf(self): + from psycopg2.extras import Range + cur = self.conn.cursor() + for type in self.builtin_ranges: + cur.execute("select '(,)'::%s" % type) + r = cur.fetchone()[0] + self.assert_(isinstance(r, Range), type) + self.assert_(not r.isempty) + self.assert_(r.lower_inf) + self.assert_(r.upper_inf) + + def test_cast_numbers(self): + from psycopg2.extras import NumericRange + cur = self.conn.cursor() + for type in ('int4range', 'int8range'): + cur.execute("select '(10,20)'::%s" % type) + r = cur.fetchone()[0] + self.assert_(isinstance(r, NumericRange)) + self.assert_(not r.isempty) + self.assertEqual(r.lower, 11) + self.assertEqual(r.upper, 20) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(r.lower_inc) + self.assert_(not r.upper_inc) + + cur.execute("select '(10.2,20.6)'::numrange") + r = cur.fetchone()[0] + self.assert_(isinstance(r, NumericRange)) + self.assert_(not r.isempty) + self.assertEqual(r.lower, Decimal('10.2')) + self.assertEqual(r.upper, Decimal('20.6')) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + def test_cast_date(self): + from psycopg2.extras import DateRange + cur = self.conn.cursor() + cur.execute("select '(2000-01-01,2012-12-31)'::daterange") + r = cur.fetchone()[0] + self.assert_(isinstance(r, DateRange)) + self.assert_(not r.isempty) + self.assertEqual(r.lower, date(2000,1,2)) + self.assertEqual(r.upper, date(2012,12,31)) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(r.lower_inc) + self.assert_(not r.upper_inc) + + def test_cast_timestamp(self): + from psycopg2.extras import DateTimeRange + cur = self.conn.cursor() + ts1 = datetime(2000,1,1) + ts2 = datetime(2000,12,31,23,59,59,999) + cur.execute("select tsrange(%s, %s, '()')", (ts1, ts2)) + r = cur.fetchone()[0] + self.assert_(isinstance(r, DateTimeRange)) + self.assert_(not r.isempty) + self.assertEqual(r.lower, ts1) + self.assertEqual(r.upper, ts2) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(not r.lower_inc) + self.assert_(not r.upper_inc) + + def test_cast_timestamptz(self): + from psycopg2.extras import DateTimeTZRange + from psycopg2.tz import FixedOffsetTimezone + cur = self.conn.cursor() + ts1 = datetime(2000,1,1, tzinfo=FixedOffsetTimezone(600)) + ts2 = datetime(2000,12,31,23,59,59,999, tzinfo=FixedOffsetTimezone(600)) + cur.execute("select tstzrange(%s, %s, '[]')", (ts1, ts2)) + r = cur.fetchone()[0] + self.assert_(isinstance(r, DateTimeTZRange)) + self.assert_(not r.isempty) + self.assertEqual(r.lower, ts1) + self.assertEqual(r.upper, ts2) + self.assert_(not r.lower_inf) + self.assert_(not r.upper_inf) + self.assert_(r.lower_inc) + self.assert_(r.upper_inc) + + def test_adapt_number_range(self): + from psycopg2.extras import NumericRange + cur = self.conn.cursor() + + r = NumericRange(empty=True) + cur.execute("select %s::int4range", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange)) + self.assert_(r1.isempty) + + r = NumericRange(10, 20) + cur.execute("select %s::int8range", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange)) + self.assertEqual(r1.lower, 10) + self.assertEqual(r1.upper, 20) + self.assert_(r1.lower_inc) + self.assert_(not r1.upper_inc) + + r = NumericRange(10.2, 20.5, '(]') + cur.execute("select %s::numrange", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange)) + self.assertEqual(r1.lower, Decimal('10.2')) + self.assertEqual(r1.upper, Decimal('20.5')) + self.assert_(not r1.lower_inc) + self.assert_(r1.upper_inc) + + def test_adapt_numeric_range(self): + from psycopg2.extras import NumericRange + cur = self.conn.cursor() + + r = NumericRange(empty=True) + cur.execute("select %s::int4range", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange), r1) + self.assert_(r1.isempty) + + r = NumericRange(10, 20) + cur.execute("select %s::int8range", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange)) + self.assertEqual(r1.lower, 10) + self.assertEqual(r1.upper, 20) + self.assert_(r1.lower_inc) + self.assert_(not r1.upper_inc) + + r = NumericRange(10.2, 20.5, '(]') + cur.execute("select %s::numrange", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, NumericRange)) + self.assertEqual(r1.lower, Decimal('10.2')) + self.assertEqual(r1.upper, Decimal('20.5')) + self.assert_(not r1.lower_inc) + self.assert_(r1.upper_inc) + + def test_adapt_date_range(self): + from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange + from psycopg2.tz import FixedOffsetTimezone + cur = self.conn.cursor() + + d1 = date(2012, 01, 01) + d2 = date(2012, 12, 31) + r = DateRange(d1, d2) + cur.execute("select %s", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, DateRange)) + self.assertEqual(r1.lower, d1) + self.assertEqual(r1.upper, d2) + self.assert_(r1.lower_inc) + self.assert_(not r1.upper_inc) + + r = DateTimeRange(empty=True) + cur.execute("select %s", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, DateTimeRange)) + self.assert_(r1.isempty) + + ts1 = datetime(2000,1,1, tzinfo=FixedOffsetTimezone(600)) + ts2 = datetime(2000,12,31,23,59,59,999, tzinfo=FixedOffsetTimezone(600)) + r = DateTimeTZRange(ts1, ts2, '(]') + cur.execute("select %s", (r,)) + r1 = cur.fetchone()[0] + self.assert_(isinstance(r1, DateTimeTZRange)) + self.assertEqual(r1.lower, ts1) + self.assertEqual(r1.upper, ts2) + self.assert_(not r1.lower_inc) + self.assert_(r1.upper_inc) + + def test_register_range_adapter(self): + from psycopg2.extras import Range, register_range + cur = self.conn.cursor() + cur.execute("create type textrange as range (subtype=text)") + rc = register_range('textrange', 'TextRange', cur) + + TextRange = rc.range + self.assert_(issubclass(TextRange, Range)) + self.assertEqual(TextRange.__name__, 'TextRange') + + r = TextRange('a', 'b', '(]') + cur.execute("select %s", (r,)) + r1 = cur.fetchone()[0] + self.assertEqual(r1.lower, 'a') + self.assertEqual(r1.upper, 'b') + self.assert_(not r1.lower_inc) + self.assert_(r1.upper_inc) + + cur.execute("select %s", ([r,r,r],)) + rs = cur.fetchone()[0] + self.assertEqual(len(rs), 3) + for r1 in rs: + self.assertEqual(r1.lower, 'a') + self.assertEqual(r1.upper, 'b') + self.assert_(not r1.lower_inc) + self.assert_(r1.upper_inc) + + def test_range_escaping(self): + from psycopg2.extras import register_range + cur = self.conn.cursor() + cur.execute("create type textrange as range (subtype=text)") + rc = register_range('textrange', 'TextRange', cur) + + TextRange = rc.range + cur.execute(""" + create table rangetest ( + id integer primary key, + range textrange)""") + + bounds = [ '[)', '(]', '()', '[]' ] + ranges = [ TextRange(low, up, bounds[i % 4]) + for i, (low, up) in enumerate(zip( + [None] + map(chr, range(1, 128)), + map(chr, range(1,128)) + [None], + ))] + ranges.append(TextRange()) + ranges.append(TextRange(empty=True)) + + errs = 0 + for i, r in enumerate(ranges): + # not all the ranges make sense: + # fun fact: select ascii('#') < ascii('$'), '#' < '$' + # yelds... t, f! At least in en_GB.UTF-8 collation. + # which seems suggesting a supremacy of the pound on the dollar. + # So some of these ranges will fail to insert. Be prepared but... + try: + cur.execute(""" + savepoint x; + insert into rangetest (id, range) values (%s, %s); + """, (i, r)) + except psycopg2.DataError: + errs += 1 + cur.execute("rollback to savepoint x;") + + # ...not too many errors! in the above collate there are 17 errors: + # assume in other collates we won't find more than 30 + self.assert_(errs < 30, + "too many collate errors. Is the test working?") + + cur.execute("select id, range from rangetest order by id") + for i, r in cur: + self.assertEqual(ranges[i].lower, r.lower) + self.assertEqual(ranges[i].upper, r.upper) + self.assertEqual(ranges[i].lower_inc, r.lower_inc) + self.assertEqual(ranges[i].upper_inc, r.upper_inc) + self.assertEqual(ranges[i].lower_inf, r.lower_inf) + self.assertEqual(ranges[i].upper_inf, r.upper_inf) + + def test_range_not_found(self): + from psycopg2.extras import register_range + cur = self.conn.cursor() + self.assertRaises(psycopg2.ProgrammingError, + register_range, 'nosuchrange', 'FailRange', cur) + + def test_schema_range(self): + cur = self.conn.cursor() + cur.execute("create schema rs") + cur.execute("create type r1 as range (subtype=text)") + cur.execute("create type r2 as range (subtype=text)") + cur.execute("create type rs.r2 as range (subtype=text)") + cur.execute("create type rs.r3 as range (subtype=text)") + cur.execute("savepoint x") + + from psycopg2.extras import register_range + ra1 = register_range('r1', 'r1', cur) + ra2 = register_range('r2', 'r2', cur) + rars2 = register_range('rs.r2', 'r2', cur) + rars3 = register_range('rs.r3', 'r3', cur) + + self.assertNotEqual( + ra2.typecaster.values[0], + rars2.typecaster.values[0]) + + self.assertRaises(psycopg2.ProgrammingError, + register_range, 'r3', 'FailRange', cur) + cur.execute("rollback to savepoint x;") + + self.assertRaises(psycopg2.ProgrammingError, + register_range, 'rs.r1', 'FailRange', cur) + cur.execute("rollback to savepoint x;") + +decorate_all_tests(RangeCasterTestCase, skip_if_no_range) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
