summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-11-19 03:55:37 +0000
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-11-19 03:55:37 +0000
commit19ead4a5cb0dcefadb604c872e9f2b93430747f6 (patch)
tree36d2cb98f1fcf5dfa018aae577dd7060c887990d /tests
parent94348bfb78cc2576d586b5f1e6fde3e10c14b178 (diff)
downloadpsycopg2-19ead4a5cb0dcefadb604c872e9f2b93430747f6.tar.gz
Test cleanup.
Tests pass or fail gracefully on older PostgreSQL versions. If unittest2 is available, skip tests instead of printing warnings.
Diffstat (limited to 'tests')
-rwxr-xr-xtests/__init__.py12
-rw-r--r--tests/extras_dictcursor.py10
-rwxr-xr-xtests/test_async.py33
-rw-r--r--tests/test_connection.py52
-rw-r--r--tests/test_copy.py15
-rw-r--r--tests/test_lobject.py67
-rwxr-xr-xtests/test_notify.py6
-rwxr-xr-xtests/test_psycopg2_dbapi20.py11
-rwxr-xr-xtests/test_quote.py9
-rwxr-xr-xtests/test_transaction.py9
-rw-r--r--tests/testutils.py46
-rwxr-xr-xtests/types_basic.py8
-rw-r--r--tests/types_extras.py66
13 files changed, 222 insertions, 122 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index 1c8f6f2..2319341 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
import os
-import unittest
+from testutils import unittest
dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
@@ -59,14 +59,8 @@ def test_suite():
suite.addTest(test_transaction.test_suite())
suite.addTest(types_basic.test_suite())
suite.addTest(types_extras.test_suite())
-
- if not green:
- suite.addTest(test_lobject.test_suite())
- suite.addTest(test_copy.test_suite())
- else:
- import warnings
- warnings.warn("copy/lobjects not implemented in green mode: skipping tests")
-
+ suite.addTest(test_lobject.test_suite())
+ suite.addTest(test_copy.test_suite())
suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
suite.addTest(test_green.test_suite())
diff --git a/tests/extras_dictcursor.py b/tests/extras_dictcursor.py
index 6bc5cad..a0583c5 100644
--- a/tests/extras_dictcursor.py
+++ b/tests/extras_dictcursor.py
@@ -16,7 +16,7 @@
import psycopg2
import psycopg2.extras
-import unittest
+from testutils import unittest
import tests
@@ -111,8 +111,7 @@ def if_has_namedtuple(f):
try:
from collections import namedtuple
except ImportError:
- import warnings
- warnings.warn("collections.namedtuple not available")
+ return self.skipTest("collections.namedtuple not available")
else:
return f(self)
@@ -133,8 +132,9 @@ class NamedTupleCursorTest(unittest.TestCase):
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
- curs.execute(
- "INSERT INTO nttest VALUES (1, 'foo'), (2, 'bar'), (3, 'baz')")
+ curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
+ curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
+ curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit()
@if_has_namedtuple
diff --git a/tests/test_async.py b/tests/test_async.py
index 9187cec..c839604 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-import unittest
+from testutils import unittest
import psycopg2
from psycopg2 import extensions
@@ -98,14 +98,13 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor()
try:
cur.callproc("pg_sleep", (0.1, ))
- except psycopg2.ProgrammingError:
- # PG <8.1 did not have pg_sleep
- return
- self.assertTrue(self.conn.isexecuting())
+ self.assertTrue(self.conn.isexecuting())
- self.wait(cur)
- self.assertFalse(self.conn.isexecuting())
- self.assertEquals(cur.fetchall()[0][0], '')
+ self.wait(cur)
+ self.assertFalse(self.conn.isexecuting())
+ self.assertEquals(cur.fetchall()[0][0], '')
+ except psycopg2.ProgrammingError:
+ return self.skipTest("PG < 8.1 did not have pg_sleep")
def test_async_after_async(self):
cur = self.conn.cursor()
@@ -213,7 +212,11 @@ class AsyncTests(unittest.TestCase):
cur.execute("begin")
self.wait(cur)
- cur.execute("insert into table1 values (1), (2), (3)")
+ cur.execute("""
+ insert into table1 values (1);
+ insert into table1 values (2);
+ insert into table1 values (3);
+ """)
self.wait(cur)
cur.execute("select id from table1 order by id")
@@ -247,7 +250,11 @@ class AsyncTests(unittest.TestCase):
def test_async_scroll(self):
cur = self.conn.cursor()
- cur.execute("insert into table1 values (1), (2), (3)")
+ cur.execute("""
+ insert into table1 values (1);
+ insert into table1 values (2);
+ insert into table1 values (3);
+ """)
self.wait(cur)
cur.execute("select id from table1 order by id")
@@ -279,7 +286,11 @@ class AsyncTests(unittest.TestCase):
def test_scroll(self):
cur = self.sync_conn.cursor()
cur.execute("create table table1 (id int)")
- cur.execute("insert into table1 values (1), (2), (3)")
+ cur.execute("""
+ insert into table1 values (1);
+ insert into table1 values (2);
+ insert into table1 values (3);
+ """)
cur.execute("select id from table1 order by id")
cur.scroll(2)
cur.scroll(-1)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 36ce640..400ddd6 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-import unittest
+from testutils import unittest, decorate_all_tests
from operator import attrgetter
import psycopg2
@@ -82,13 +82,24 @@ class ConnectionTests(unittest.TestCase):
conn = self.connect()
self.assert_(conn.protocol_version in (2,3), conn.protocol_version)
+ def test_tpc_unsupported(self):
+ cnn = self.connect()
+ if cnn.server_version >= 80100:
+ return self.skipTest("tpc is supported")
+
+ self.assertRaises(psycopg2.NotSupportedError,
+ cnn.xid, 42, "foo", "bar")
+
class IsolationLevelsTestCase(unittest.TestCase):
def setUp(self):
conn = self.connect()
cur = conn.cursor()
- cur.execute("drop table if exists isolevel;")
+ try:
+ cur.execute("drop table isolevel;")
+ except psycopg2.ProgrammingError:
+ conn.rollback()
cur.execute("create table isolevel (id integer);")
conn.commit()
conn.close()
@@ -244,18 +255,16 @@ def skip_if_tpc_disabled(f):
try:
cur.execute("SHOW max_prepared_transactions;")
except psycopg2.ProgrammingError:
- # Server version too old: let's die a different death
- mtp = 1
+ return self.skipTest(
+ "server too old: two phase transactions not supported.")
else:
mtp = int(cur.fetchone()[0])
cnn.close()
if not mtp:
- import warnings
- warnings.warn(
+ return self.skipTest(
"server not configured for two phase transactions. "
"set max_prepared_transactions to > 0 to run the test")
- return
return f(self)
skip_if_tpc_disabled_.__name__ = f.__name__
@@ -274,9 +283,15 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
- cur.execute(
- "select gid from pg_prepared_xacts where database = %s",
- (tests.dbname,))
+ try:
+ cur.execute(
+ "select gid from pg_prepared_xacts where database = %s",
+ (tests.dbname,))
+ except psycopg2.ProgrammingError:
+ cnn.rollback()
+ cnn.close()
+ return
+
gids = [ r[0] for r in cur ]
for gid in gids:
cur.execute("rollback prepared %s;", (gid,))
@@ -285,7 +300,10 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
def make_test_table(self):
cnn = self.connect()
cur = cnn.cursor()
- cur.execute("DROP TABLE IF EXISTS test_tpc;")
+ try:
+ cur.execute("DROP TABLE test_tpc;")
+ except psycopg2.ProgrammingError:
+ cnn.rollback()
cur.execute("CREATE TABLE test_tpc (data text);")
cnn.commit()
cnn.close()
@@ -314,7 +332,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
- @skip_if_tpc_disabled
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@@ -356,7 +373,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
- @skip_if_tpc_disabled
def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@@ -383,7 +399,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
- @skip_if_tpc_disabled
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@@ -425,7 +440,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
- @skip_if_tpc_disabled
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@@ -464,7 +478,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
xns = cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
- @skip_if_tpc_disabled
def test_recovered_xids(self):
# insert a few test xns
cnn = self.connect()
@@ -495,7 +508,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(xid.owner, owner)
self.assertEqual(xid.database, database)
- @skip_if_tpc_disabled
def test_xid_encoding(self):
cnn = self.connect()
xid = cnn.xid(42, "gtrid", "bqual")
@@ -508,7 +520,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
(tests.dbname,))
self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0])
- @skip_if_tpc_disabled
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
@@ -532,7 +543,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.tpc_rollback(xid)
- @skip_if_tpc_disabled
def test_unparsed_roundtrip(self):
for tid in [
'',
@@ -585,7 +595,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
x2 = Xid.from_string('99_xxx_yyy')
self.assertEqual(str(x2), '99_xxx_yyy')
- @skip_if_tpc_disabled
def test_xid_unicode(self):
cnn = self.connect()
x1 = cnn.xid(10, u'uni', u'code')
@@ -598,7 +607,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual('uni', xid.gtrid)
self.assertEqual('code', xid.bqual)
- @skip_if_tpc_disabled
def test_xid_unicode_unparsed(self):
# We don't expect people shooting snowmen as transaction ids,
# so if something explodes in an encode error I don't mind.
@@ -615,6 +623,8 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual('transaction-id', xid.gtrid)
self.assertEqual(None, xid.bqual)
+decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/tests/test_copy.py b/tests/test_copy.py
index 19fabe1..b0d9965 100644
--- a/tests/test_copy.py
+++ b/tests/test_copy.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
import os
import string
-import unittest
+from testutils import unittest, decorate_all_tests
from cStringIO import StringIO
from itertools import cycle, izip
@@ -9,6 +9,15 @@ import psycopg2
import psycopg2.extensions
import tests
+def skip_if_green(f):
+ def skip_if_green_(self):
+ if tests.green:
+ return self.skipTest("copy in async mode currently not supported")
+ else:
+ return f(self)
+
+ return skip_if_green_
+
class MinimalRead(object):
"""A file wrapper exposing the minimal interface to copy from."""
@@ -29,6 +38,7 @@ class MinimalWrite(object):
def write(self, data):
return self.f.write(data)
+
class CopyTests(unittest.TestCase):
def setUp(self):
@@ -124,6 +134,9 @@ class CopyTests(unittest.TestCase):
self.assertEqual(ntests, len(string.letters))
+decorate_all_tests(CopyTests, skip_if_green)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/tests/test_lobject.py b/tests/test_lobject.py
index f4a684f..88e18d6 100644
--- a/tests/test_lobject.py
+++ b/tests/test_lobject.py
@@ -2,16 +2,33 @@
import os
import shutil
import tempfile
-import unittest
-import warnings
+from testutils import unittest, decorate_all_tests
import psycopg2
import psycopg2.extensions
import tests
+def skip_if_no_lo(f):
+ def skip_if_no_lo_(self):
+ if self.conn.server_version < 80100:
+ return self.skipTest("large objects only supported from PG 8.1")
+ else:
+ return f(self)
-class LargeObjectTests(unittest.TestCase):
+ return skip_if_no_lo_
+def skip_if_green(f):
+ def skip_if_green_(self):
+ if tests.green:
+ return self.skipTest("libpq doesn't support LO in async mode")
+ else:
+ return f(self)
+
+ return skip_if_green_
+
+
+class LargeObjectMixin(object):
+ # doesn't derive from TestCase to avoid repeating tests twice.
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
self.lo_oid = None
@@ -30,6 +47,8 @@ class LargeObjectTests(unittest.TestCase):
lo.unlink()
self.conn.close()
+
+class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
def test_create(self):
lo = self.conn.lobject()
self.assertNotEqual(lo, None)
@@ -261,30 +280,25 @@ class LargeObjectTests(unittest.TestCase):
self.assertTrue(os.path.exists(filename))
self.assertEqual(open(filename, "rb").read(), "some data")
+decorate_all_tests(LargeObjectTests, skip_if_no_lo)
+decorate_all_tests(LargeObjectTests, skip_if_green)
-class LargeObjectTruncateTests(LargeObjectTests):
-
- skip = None
- def setUp(self):
- LargeObjectTests.setUp(self)
+def skip_if_no_truncate(f):
+ def skip_if_no_truncate_(self):
+ if self.conn.server_version < 80300:
+ return self.skipTest(
+ "the server doesn't support large object truncate")
- if self.skip is None:
- self.skip = False
- if self.conn.server_version < 80300:
- warnings.warn("Large object truncate tests skipped, "
- "the server does not support them")
- self.skip = True
+ if not hasattr(psycopg2.extensions.lobject, 'truncate'):
+ return self.skipTest(
+ "psycopg2 has been built against a libpq "
+ "without large object truncate support.")
- if not hasattr(psycopg2.extensions.lobject, 'truncate'):
- warnings.warn("Large object truncate tests skipped, "
- "psycopg2 has been built against an old library")
- self.skip = True
+ return f(self)
+class LargeObjectTruncateTests(LargeObjectMixin, unittest.TestCase):
def test_truncate(self):
- if self.skip:
- return
-
lo = self.conn.lobject()
lo.write("some data")
lo.close()
@@ -308,23 +322,22 @@ class LargeObjectTruncateTests(LargeObjectTests):
self.assertEqual(lo.read(), "")
def test_truncate_after_close(self):
- if self.skip:
- return
-
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.truncate)
def test_truncate_after_commit(self):
- if self.skip:
- return
-
lo = self.conn.lobject()
self.lo_oid = lo.oid
self.conn.commit()
self.assertRaises(psycopg2.ProgrammingError, lo.truncate)
+decorate_all_tests(LargeObjectTruncateTests, skip_if_no_lo)
+decorate_all_tests(LargeObjectTruncateTests, skip_if_green)
+decorate_all_tests(LargeObjectTruncateTests, skip_if_no_truncate)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/tests/test_notify.py b/tests/test_notify.py
index 8dd8c70..75dae1e 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python
-import unittest
-import warnings
+from testutils import unittest
import psycopg2
from psycopg2 import extensions
@@ -127,9 +126,8 @@ conn.close()
def test_notify_payload(self):
if self.conn.server_version < 90000:
- warnings.warn("server version %s doesn't support notify payload: skipping test"
+ return self.skipTest("server version %s doesn't support notify payload"
% self.conn.server_version)
- return
self.autocommit(self.conn)
self.listen('foo')
pid = int(self.notify('foo', payload="Hello, world!").communicate()[0])
diff --git a/tests/test_psycopg2_dbapi20.py b/tests/test_psycopg2_dbapi20.py
index 1d83f06..93fdfa6 100755
--- a/tests/test_psycopg2_dbapi20.py
+++ b/tests/test_psycopg2_dbapi20.py
@@ -2,7 +2,7 @@
import dbapi20
import dbapi20_tpc
from test_connection import skip_if_tpc_disabled
-import unittest
+from testutils import unittest, decorate_all_tests
import psycopg2
import tests
@@ -23,19 +23,14 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
pass
-class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests):
+class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests, unittest.TestCase):
driver = psycopg2
def connect(self):
return psycopg2.connect(dsn=tests.dsn)
- @skip_if_tpc_disabled
- def test_tpc_commit_with_prepare(self):
- super(Psycopg2TPCTests, self).test_tpc_commit_with_prepare()
+decorate_all_tests(Psycopg2TPCTests, skip_if_tpc_disabled)
- @skip_if_tpc_disabled
- def test_tpc_rollback_with_prepare(self):
- super(Psycopg2TPCTests, self).test_tpc_rollback_with_prepare()
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
diff --git a/tests/test_quote.py b/tests/test_quote.py
index 6da10fa..95c5d7a 100755
--- a/tests/test_quote.py
+++ b/tests/test_quote.py
@@ -1,6 +1,5 @@
#!/usr/bin/env python
-import unittest
-import warnings
+from testutils import unittest
import psycopg2
import psycopg2.extensions
@@ -61,9 +60,9 @@ class QuotingTestCase(unittest.TestCase):
curs.execute("SHOW server_encoding")
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
- warnings.warn("Unicode test skipped since server encoding is %s"
- % server_encoding)
- return
+ return self.skipTest(
+ "Unicode test skipped since server encoding is %s"
+ % server_encoding)
data = u"""some data with \t chars
to escape into, 'quotes', \u20ac euro sign and \\ a backslash too.
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 030bf17..3ee5ee7 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
import threading
-import unittest
+from testutils import unittest
import psycopg2
from psycopg2.extensions import (
@@ -215,8 +215,11 @@ class QueryCancellationTests(unittest.TestCase):
curs = self.conn.cursor()
# Set a low statement timeout, then sleep for a longer period.
curs.execute('SET statement_timeout TO 10')
- self.assertRaises(psycopg2.extensions.QueryCanceledError,
- curs.execute, 'SELECT pg_sleep(50)')
+ try:
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ curs.execute, 'SELECT pg_sleep(50)')
+ except psycopg2.ProgrammingError:
+ return self.skipTest("pg_sleep not available")
def test_suite():
diff --git a/tests/testutils.py b/tests/testutils.py
new file mode 100644
index 0000000..c047827
--- /dev/null
+++ b/tests/testutils.py
@@ -0,0 +1,46 @@
+# Utility module for psycopg2 testing.
+#
+# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+
+# Use unittest2 if available. Otherwise mock a skip facility with warnings.
+
+try:
+ import unittest2
+ unittest = unittest2
+except ImportError:
+ import unittest
+ unittest2 = None
+
+if hasattr(unittest, 'skipIf'):
+ from unittest2 import skip, skipIf
+
+else:
+ import warnings
+
+ def skipIf(cond, msg):
+ def skipIf_(f):
+ def skipIf__(self):
+ if cond:
+ warnings.warn(msg)
+ return
+ else:
+ return f(self)
+ return skipIf__
+ return skipIf_
+
+ def skip(msg):
+ return skipIf(True, msg)
+
+ def skipTest(self, msg):
+ warnings.warn(msg)
+ return
+
+ unittest.TestCase.skipTest = skipTest
+
+
+def decorate_all_tests(cls, decorator):
+ """Apply *decorator* to all the tests defined in the TestCase *cls*."""
+ for n in dir(cls):
+ if n.startswith('test'):
+ setattr(cls, n, decorator(getattr(cls, n)))
+
diff --git a/tests/types_basic.py b/tests/types_basic.py
index aee4754..8e6f199 100755
--- a/tests/types_basic.py
+++ b/tests/types_basic.py
@@ -27,7 +27,7 @@ try:
except:
pass
import sys
-import unittest
+from testutils import unittest
import psycopg2
import tests
@@ -78,14 +78,14 @@ class TypesBasicTests(unittest.TestCase):
s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),))
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.failUnless(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s))
+ else:
+ return self.skipTest("decimal not available")
def testFloat(self):
try:
float("nan")
except ValueError:
- import warnings
- warnings.warn("nan not available on this platform")
- return
+ return self.skipTest("nan not available on this platform")
s = self.execute("SELECT %s AS foo", (float("nan"),))
self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s))
diff --git a/tests/types_extras.py b/tests/types_extras.py
index dc5a585..062289e 100644
--- a/tests/types_extras.py
+++ b/tests/types_extras.py
@@ -20,14 +20,35 @@ except:
pass
import re
import sys
-import unittest
-import warnings
+from testutils import unittest
import psycopg2
import psycopg2.extras
import tests
+def skip_if_no_uuid(f):
+ def skip_if_no_uuid_(self):
+ try:
+ cur = self.conn.cursor()
+ cur.execute("select typname from pg_type where typname = 'uuid'")
+ has = cur.fetchone()
+ finally:
+ self.conn.rollback()
+
+ if has:
+ return f(self)
+ else:
+ return self.skipTest("uuid type not available")
+
+ return skip_if_no_uuid_
+
+def filter_scs(conn, s):
+ if conn.get_parameter_status("standard_conforming_strings") == 'off':
+ return s
+ else:
+ return s.replace("E'", "'")
+
class TypesExtrasTests(unittest.TestCase):
"""Test that all type conversions are working."""
@@ -39,12 +60,10 @@ class TypesExtrasTests(unittest.TestCase):
curs.execute(*args)
return curs.fetchone()[0]
+ @skip_if_no_uuid
def testUUID(self):
- try:
- import uuid
- psycopg2.extras.register_uuid()
- except:
- return
+ import uuid
+ psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
@@ -52,12 +71,10 @@ class TypesExtrasTests(unittest.TestCase):
s = self.execute("SELECT NULL::uuid AS foo")
self.failUnless(s is None)
+ @skip_if_no_uuid
def testUUIDARRAY(self):
- try:
- import uuid
- psycopg2.extras.register_uuid()
- except:
- return
+ import uuid
+ psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
@@ -86,13 +103,17 @@ class TypesExtrasTests(unittest.TestCase):
i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
- self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted())
+ self.assertEqual(
+ filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
+ a.getquoted())
# adapts ok with unicode too
i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
- self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted())
+ self.assertEqual(
+ filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
+ a.getquoted())
def test_adapt_fail(self):
class Foo(object): pass
@@ -109,8 +130,7 @@ def skip_if_no_hstore(f):
from psycopg2.extras import HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
if oids is None:
- warnings.warn("hstore not available in test database: skipping test")
- return
+ return self.skipTest("hstore not available in test database")
return f(self)
return skip_if_no_hstore_
@@ -121,8 +141,7 @@ class HstoreTestCase(unittest.TestCase):
def test_adapt_8(self):
if self.conn.server_version >= 90000:
- warnings.warn("skipping dict adaptation with PG pre-9 syntax")
- return
+ return self.skipTest("skipping dict adaptation with PG pre-9 syntax")
from psycopg2.extras import HstoreAdapter
@@ -136,16 +155,15 @@ class HstoreTestCase(unittest.TestCase):
ii = q[1:-1].split("||")
ii.sort()
- self.assertEqual(ii[0], "(E'a' => E'1')")
- self.assertEqual(ii[1], "(E'b' => E'''')")
- self.assertEqual(ii[2], "(E'c' => NULL)")
+ self.assertEqual(ii[0], filter_scs(self.conn, "(E'a' => E'1')"))
+ self.assertEqual(ii[1], filter_scs(self.conn, "(E'b' => E'''')"))
+ self.assertEqual(ii[2], filter_scs(self.conn, "(E'c' => NULL)"))
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
- self.assertEqual(ii[3], "(E'd' => E'%s')" % encc)
+ self.assertEqual(ii[3], filter_scs(self.conn, "(E'd' => E'%s')" % encc))
def test_adapt_9(self):
if self.conn.server_version < 90000:
- warnings.warn("skipping dict adaptation with PG 9 syntax")
- return
+ return self.skipTest("skipping dict adaptation with PG 9 syntax")
from psycopg2.extras import HstoreAdapter