summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/test_async.py4
-rwxr-xr-xtests/test_cancel.py6
-rwxr-xr-xtests/test_connection.py4
-rwxr-xr-xtests/test_cursor.py6
-rwxr-xr-xtests/test_transaction.py4
-rw-r--r--tests/testutils.py96
-rwxr-xr-xtests/types_basic.py16
7 files changed, 74 insertions, 62 deletions
diff --git a/tests/test_async.py b/tests/test_async.py
index ea5f1f1..07a3c42 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -23,7 +23,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
-from testutils import unittest, skip_if_no_pg_sleep
+from testutils import unittest, skip_before_postgres
import psycopg2
from psycopg2 import extensions
@@ -113,7 +113,7 @@ class AsyncTests(unittest.TestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchone()[0], "a")
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_async_callproc(self):
cur = self.conn.cursor()
cur.callproc("pg_sleep", (0.1, ))
diff --git a/tests/test_cancel.py b/tests/test_cancel.py
index dcdbf41..6d58ddc 100755
--- a/tests/test_cancel.py
+++ b/tests/test_cancel.py
@@ -31,7 +31,7 @@ import psycopg2.extensions
from psycopg2 import extras
from testconfig import dsn
-from testutils import unittest, skip_if_no_pg_sleep
+from testutils import unittest, skip_before_postgres
class CancelTests(unittest.TestCase):
@@ -50,7 +50,7 @@ class CancelTests(unittest.TestCase):
def test_empty_cancel(self):
self.conn.cancel()
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_cancel(self):
errors = []
@@ -86,7 +86,7 @@ class CancelTests(unittest.TestCase):
self.assertEqual(errors, [])
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 616ff28..f4b7317 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -24,7 +24,7 @@
import time
import threading
-from testutils import unittest, decorate_all_tests, skip_if_no_pg_sleep
+from testutils import unittest, decorate_all_tests, skip_before_postgres
from operator import attrgetter
import psycopg2
@@ -114,7 +114,7 @@ class ConnectionTests(unittest.TestCase):
self.assertRaises(psycopg2.NotSupportedError,
cnn.xid, 42, "foo", "bar")
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_concurrent_execution(self):
def slave():
cnn = psycopg2.connect(dsn)
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index 4283596..69eca64 100755
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -27,7 +27,7 @@ import psycopg2
import psycopg2.extensions
from psycopg2.extensions import b
from testconfig import dsn
-from testutils import unittest, skip_if_no_pg_sleep
+from testutils import unittest, skip_before_postgres
class CursorTests(unittest.TestCase):
@@ -130,7 +130,7 @@ class CursorTests(unittest.TestCase):
del curs
self.assert_(w() is None)
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self):
curs = self.conn.cursor('tmp')
# if these records are fetched in the same roundtrip their
@@ -144,6 +144,7 @@ class CursorTests(unittest.TestCase):
"named cursor records fetched in 2 roundtrips (delta: %s)"
% (t2 - t1))
+ @skip_before_postgres(8, 0)
def test_iter_named_cursor_default_arraysize(self):
curs = self.conn.cursor('tmp')
curs.execute('select generate_series(1,50)')
@@ -151,6 +152,7 @@ class CursorTests(unittest.TestCase):
# everything swallowed in one gulp
self.assertEqual(rv, [(i,i) for i in range(1,51)])
+ @skip_before_postgres(8, 0)
def test_iter_named_cursor_arraysize(self):
curs = self.conn.cursor('tmp')
curs.arraysize = 30
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index cab9c45..90e159a 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -23,7 +23,7 @@
# License for more details.
import threading
-from testutils import unittest, skip_if_no_pg_sleep
+from testutils import unittest, skip_before_postgres
import psycopg2
from psycopg2.extensions import (
@@ -236,7 +236,7 @@ class QueryCancellationTests(unittest.TestCase):
def tearDown(self):
self.conn.close()
- @skip_if_no_pg_sleep('conn')
+ @skip_before_postgres(8, 2)
def test_statement_timeout(self):
curs = self.conn.cursor()
# Set a low statement timeout, then sleep for a longer period.
diff --git a/tests/testutils.py b/tests/testutils.py
index 8942945..8e99f04 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -102,31 +102,6 @@ def skip_if_no_uuid(f):
return skip_if_no_uuid_
-def skip_if_no_pg_sleep(name):
- """Decorator to skip a test if pg_sleep is not supported by the server.
-
- Pass it the name of an attribute containing a connection or of a method
- returning a connection.
- """
- def skip_if_no_pg_sleep_(f):
- def skip_if_no_pg_sleep__(self):
- cnn = getattr(self, name)
- if callable(cnn):
- cnn = cnn()
-
- if cnn.server_version < 80200:
- return self.skipTest(
- "server version %s doesn't support pg_sleep"
- % cnn.server_version)
-
- return f(self)
-
- skip_if_no_pg_sleep__.__name__ = f.__name__
- return skip_if_no_pg_sleep__
-
- return skip_if_no_pg_sleep_
-
-
def skip_if_tpc_disabled(f):
"""Skip a test if the server has tpc support disabled."""
def skip_if_tpc_disabled_(self):
@@ -165,25 +140,60 @@ def skip_if_no_iobase(f):
return skip_if_no_iobase_
-def skip_on_python2(f):
- """Skip a test on Python 3 and following."""
- def skip_on_python2_(self):
- if sys.version_info[0] < 3:
- return self.skipTest("skipped because Python 2")
- else:
- return f(self)
-
- return skip_on_python2_
-
-def skip_on_python3(f):
- """Skip a test on Python 3 and following."""
- def skip_on_python3_(self):
- if sys.version_info[0] >= 3:
- return self.skipTest("skipped because Python 3")
- else:
- return f(self)
+def skip_before_postgres(*ver):
+ """Skip a test on PostgreSQL before a certain version."""
+ ver = ver + (0,) * (3 - len(ver))
+ def skip_before_postgres_(f):
+ def skip_before_postgres__(self):
+ if self.conn.server_version < int("%d%02d%02d" % ver):
+ return self.skipTest("skipped because PostgreSQL %s"
+ % self.conn.server_version)
+ else:
+ return f(self)
+
+ return skip_before_postgres__
+ return skip_before_postgres_
+
+def skip_after_postgres(*ver):
+ """Skip a test on PostgreSQL after (including) a certain version."""
+ ver = ver + (0,) * (3 - len(ver))
+ def skip_after_postgres_(f):
+ def skip_after_postgres__(self):
+ if self.conn.server_version >= int("%d%02d%02d" % ver):
+ return self.skipTest("skipped because PostgreSQL %s"
+ % self.conn.server_version)
+ else:
+ return f(self)
+
+ return skip_after_postgres__
+ return skip_after_postgres_
+
+def skip_before_python(*ver):
+ """Skip a test on Python before a certain version."""
+ def skip_before_python_(f):
+ def skip_before_python__(self):
+ if sys.version_info[:len(ver)] < ver:
+ return self.skipTest("skipped because Python %s"
+ % ".".join(map(str, sys.version_info[:len(ver)])))
+ else:
+ return f(self)
+
+ return skip_before_python__
+ return skip_before_python_
+
+def skip_from_python(*ver):
+ """Skip a test on Python after (including) a certain version."""
+ def skip_from_python_(f):
+ def skip_from_python__(self):
+ if sys.version_info[:len(ver)] >= ver:
+ return self.skipTest("skipped because Python %s"
+ % ".".join(map(str, sys.version_info[:len(ver)])))
+ else:
+ return f(self)
+
+ return skip_from_python__
+ return skip_from_python_
- return skip_on_python3_
def script_to_py3(script):
"""Convert a script to Python3 syntax if required."""
diff --git a/tests/types_basic.py b/tests/types_basic.py
index fa9c062..ab90502 100755
--- a/tests/types_basic.py
+++ b/tests/types_basic.py
@@ -171,7 +171,7 @@ class TypesBasicTests(unittest.TestCase):
curs.execute("select col from array_test where id = 2")
self.assertEqual(curs.fetchone()[0], [])
- @testutils.skip_on_python3
+ @testutils.skip_from_python(3)
def testTypeRoundtripBuffer(self):
o1 = buffer("".join(map(chr, range(256))))
o2 = self.execute("select %s;", (o1,))
@@ -182,14 +182,14 @@ class TypesBasicTests(unittest.TestCase):
o2 = self.execute("select %s;", (o1,))
self.assertEqual(type(o1), type(o2))
- @testutils.skip_on_python3
+ @testutils.skip_from_python(3)
def testTypeRoundtripBufferArray(self):
o1 = buffer("".join(map(chr, range(256))))
o1 = [o1]
o2 = self.execute("select %s;", (o1,))
self.assertEqual(type(o1[0]), type(o2[0]))
- @testutils.skip_on_python2
+ @testutils.skip_before_python(3)
def testTypeRoundtripBytes(self):
o1 = bytes(range(256))
o2 = self.execute("select %s;", (o1,))
@@ -200,14 +200,14 @@ class TypesBasicTests(unittest.TestCase):
o2 = self.execute("select %s;", (o1,))
self.assertEqual(memoryview, type(o2))
- @testutils.skip_on_python2
+ @testutils.skip_before_python(3)
def testTypeRoundtripBytesArray(self):
o1 = bytes(range(256))
o1 = [o1]
o2 = self.execute("select %s;", (o1,))
self.assertEqual(memoryview, type(o2[0]))
- @testutils.skip_on_python2
+ @testutils.skip_before_python(3)
def testAdaptBytearray(self):
o1 = bytearray(range(256))
o2 = self.execute("select %s;", (o1,))
@@ -218,7 +218,7 @@ class TypesBasicTests(unittest.TestCase):
o2 = self.execute("select %s;", (o1,))
self.assertEqual(memoryview, type(o2))
- @testutils.skip_on_python2
+ @testutils.skip_before_python(3)
def testAdaptMemoryview(self):
o1 = memoryview(bytes(range(256)))
o2 = self.execute("select %s;", (o1,))
@@ -253,7 +253,7 @@ class AdaptSubclassTest(unittest.TestCase):
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
- @testutils.skip_on_python3
+ @testutils.skip_from_python(3)
def test_no_mro_no_joy(self):
from psycopg2.extensions import adapt, register_adapter, AsIs
@@ -267,7 +267,7 @@ class AdaptSubclassTest(unittest.TestCase):
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
- @testutils.skip_on_python2
+ @testutils.skip_before_python(3)
def test_adapt_subtype_3(self):
from psycopg2.extensions import adapt, register_adapter, AsIs