summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2011-09-22 19:53:21 +0200
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2011-09-22 19:56:58 +0200
commit2f9ceeac64205e9632c1072fc907b49dba433999 (patch)
tree8412b3256d994efb9e0fe83c030102b443982146
parentc4e6d7d982e80d3c9c11efef5b60c54c9675b71f (diff)
downloadpsycopg2-2f9ceeac64205e9632c1072fc907b49dba433999.tar.gz
Added support for arrays of hstores
-rw-r--r--NEWS1
-rw-r--r--lib/extras.py30
-rwxr-xr-xtests/types_extras.py59
3 files changed, 84 insertions, 6 deletions
diff --git a/NEWS b/NEWS
index ec70072..cec9a0d 100644
--- a/NEWS
+++ b/NEWS
@@ -3,6 +3,7 @@ What's new in psycopg 2.4.3
- Added 'new_array_type()' function for easy creation of array
typecasters.
+ - Added support for arrays of hstores (ticket #66).
- Fixed segfault in case of transaction started with connection lost
(and possibly other events).
- Rollback connections in transaction or in error before putting them
diff --git a/lib/extras.py b/lib/extras.py
index c6a2504..0ef44a0 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -699,7 +699,8 @@ WHERE typname = 'hstore';
return tuple(rv0), tuple(rv1)
-def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
+def register_hstore(conn_or_curs, globally=False, unicode=False,
+ oid=None, array_oid=None):
"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions.
:param conn_or_curs: a connection or cursor: the typecaster will be
@@ -709,14 +710,18 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
will be `!unicode` instead of `!str`. The option is not available on
Python 3
:param oid: the OID of the |hstore| type if known. If not, it will be
- queried on *conn_or_curs*
+ queried on *conn_or_curs*.
+ :param array_oid: the OID of the |hstore| array type if known. If not, it
+ will be queried on *conn_or_curs*.
The connection or cursor passed to the function will be used to query the
database and look for the OID of the |hstore| type (which may be different
across databases). If querying is not desirable (e.g. with
:ref:`asynchronous connections <async-support>`) you may specify it in the
- *oid* parameter (it can be found using a query such as :sql:`SELECT
- 'hstore'::regtype::oid;`).
+ *oid* parameter, which can be found using a query such as :sql:`SELECT
+ 'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid*
+ using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`.
+
Note that, when passing a dictionary from Python to the database, both
strings and unicode keys and values are supported. Dictionaries returned
@@ -730,6 +735,10 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
added the *oid* parameter. If not specified, the typecaster is
installed also if |hstore| is not installed in the :sql:`public`
schema.
+
+ .. versionchanged:: 2.4.3
+ added support for |hstore| array.
+
"""
if oid is None:
oid = HstoreAdapter.get_oids(conn_or_curs)
@@ -738,11 +747,18 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
"hstore type not found in the database. "
"please install it from your 'contrib/hstore.sql' file")
else:
- oid = oid[0] # for the moment we don't have a HSTOREARRAY
+ array_oid = oid[1]
+ oid = oid[0]
if isinstance(oid, int):
oid = (oid,)
+ if array_oid is not None:
+ if isinstance(array_oid, int):
+ array_oid = (array_oid,)
+ else:
+ array_oid = tuple([x for x in array_oid if x])
+
# create and register the typecaster
if sys.version_info[0] < 3 and unicode:
cast = HstoreAdapter.parse_unicode
@@ -753,6 +769,10 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
_ext.register_type(HSTORE, not globally and conn_or_curs or None)
_ext.register_adapter(dict, HstoreAdapter)
+ if array_oid:
+ HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE)
+ _ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None)
+
class CompositeCaster(object):
"""Helps conversion of a PostgreSQL composite type into a Python object.
diff --git a/tests/types_extras.py b/tests/types_extras.py
index d6b5726..7c0ca91 100755
--- a/tests/types_extras.py
+++ b/tests/types_extras.py
@@ -22,7 +22,7 @@ import re
import sys
from datetime import date
-from testutils import unittest, skip_if_no_uuid
+from testutils import unittest, skip_if_no_uuid, skip_before_postgres
import psycopg2
import psycopg2.extras
@@ -357,6 +357,63 @@ class HstoreTestCase(unittest.TestCase):
finally:
psycopg2.extensions.string_types.pop(oid)
+ @skip_if_no_hstore
+ @skip_before_postgres(8, 3)
+ def test_roundtrip_array(self):
+ from psycopg2.extras import register_hstore
+ register_hstore(self.conn)
+
+ ds = []
+ ds.append({})
+ ds.append({'a': 'b', 'c': None})
+
+ ab = map(chr, range(32, 128))
+ ds.append(dict(zip(ab, ab)))
+ ds.append({''.join(ab): ''.join(ab)})
+
+ self.conn.set_client_encoding('latin1')
+ if sys.version_info[0] < 3:
+ ab = map(chr, range(32, 127) + range(160, 255))
+ else:
+ ab = bytes(range(32, 127) + range(160, 255)).decode('latin1')
+
+ ds.append({''.join(ab): ''.join(ab)})
+ ds.append(dict(zip(ab, ab)))
+
+ cur = self.conn.cursor()
+ cur.execute("select %s", (ds,))
+ ds1 = cur.fetchone()[0]
+ self.assertEqual(ds, ds1)
+
+ @skip_if_no_hstore
+ @skip_before_postgres(8, 3)
+ def test_array_cast(self):
+ from psycopg2.extras import register_hstore
+ register_hstore(self.conn)
+ cur = self.conn.cursor()
+ cur.execute("select array['a=>1'::hstore, 'b=>2'::hstore];")
+ a = cur.fetchone()[0]
+ self.assertEqual(a, [{'a': '1'}, {'b': '2'}])
+
+ @skip_if_no_hstore
+ def test_array_cast_oid(self):
+ cur = self.conn.cursor()
+ cur.execute("select 'hstore'::regtype::oid, 'hstore[]'::regtype::oid")
+ oid, aoid = cur.fetchone()
+
+ from psycopg2.extras import register_hstore
+ register_hstore(None, globally=True, oid=oid, array_oid=aoid)
+ try:
+ cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore, '{a=>b}'::hstore[]")
+ t = cur.fetchone()
+ self.assert_(t[0] is None)
+ self.assertEqual(t[1], {})
+ self.assertEqual(t[2], {'a': 'b'})
+ self.assertEqual(t[3], [{'a': 'b'}])
+
+ finally:
+ psycopg2.extensions.string_types.pop(oid)
+ psycopg2.extensions.string_types.pop(aoid)
def skip_if_no_composite(f):
def skip_if_no_composite_(self):