summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtests/__init__.py32
-rwxr-xr-xtests/bug_gc.py8
-rw-r--r--tests/extras_dictcursor.py9
-rwxr-xr-xtests/test_async.py13
-rw-r--r--tests/test_cancel.py10
-rw-r--r--tests/test_connection.py28
-rw-r--r--tests/test_copy.py6
-rw-r--r--tests/test_cursor.py4
-rw-r--r--tests/test_dates.py6
-rw-r--r--tests/test_green.py4
-rw-r--r--tests/test_lobject.py8
-rwxr-xr-xtests/test_notify.py12
-rwxr-xr-xtests/test_psycopg2_dbapi20.py6
-rwxr-xr-xtests/test_quote.py4
-rwxr-xr-xtests/test_transaction.py9
-rw-r--r--tests/testconfig.py33
-rwxr-xr-xtests/types_basic.py4
-rw-r--r--tests/types_extras.py6
18 files changed, 96 insertions, 106 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index ad8feb2..cc6a92e 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,37 +1,9 @@
#!/usr/bin/env python
-import os
import sys
+from testconfig import dsn
from testutils import unittest
-dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
-dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
-dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None)
-dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None)
-
-# Check if we want to test psycopg's green path.
-green = os.environ.get('PSYCOPG2_TEST_GREEN', None)
-if green:
- if green == '1':
- from psycopg2.extras import wait_select as wait_callback
- elif green == 'eventlet':
- from eventlet.support.psycopg2_patcher import eventlet_wait_callback \
- as wait_callback
- else:
- raise ValueError("please set 'PSYCOPG2_TEST_GREEN' to a valid value")
-
- import psycopg2.extensions
- psycopg2.extensions.set_wait_callback(wait_callback)
-
-# Construct a DSN to connect to the test database:
-dsn = 'dbname=%s' % dbname
-if dbhost is not None:
- dsn += ' host=%s' % dbhost
-if dbport is not None:
- dsn += ' port=%s' % dbport
-if dbuser is not None:
- dsn += ' user=%s' % dbuser
-
# If connection to test db fails, bail out early.
import psycopg2
try:
@@ -81,4 +53,4 @@ def test_suite():
return suite
if __name__ == '__main__':
- unittest.main(defaultTest='test_suite')
+ unittest.main(defaultTest=test_suite)
diff --git a/tests/bug_gc.py b/tests/bug_gc.py
index 89d90d2..3cdcc6a 100755
--- a/tests/bug_gc.py
+++ b/tests/bug_gc.py
@@ -6,18 +6,14 @@ import time
import unittest
import gc
-import sys
-if sys.version_info < (3,):
- import tests
-else:
- import py3tests as tests
+from testconfig import dsn
class StolenReferenceTestCase(unittest.TestCase):
def test_stolen_reference_bug(self):
def fish(val, cur):
gc.collect()
return 42
- conn = psycopg2.connect(tests.dsn)
+ conn = psycopg2.connect(dsn)
UUID = psycopg2.extensions.new_type((2950,), "UUID", fish)
psycopg2.extensions.register_type(UUID, conn)
curs = conn.cursor()
diff --git a/tests/extras_dictcursor.py b/tests/extras_dictcursor.py
index c2bacb0..1bb44ad 100644
--- a/tests/extras_dictcursor.py
+++ b/tests/extras_dictcursor.py
@@ -17,15 +17,14 @@
import psycopg2
import psycopg2.extras
from testutils import unittest
-
-import tests
+from testconfig import dsn
class ExtrasDictCursorTests(unittest.TestCase):
"""Test if DictCursor extension class works."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)")
curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')")
@@ -135,7 +134,7 @@ class NamedTupleCursorTest(unittest.TestCase):
self.conn = None
return
- self.conn = psycopg2.connect(tests.dsn,
+ self.conn = psycopg2.connect(dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
@@ -207,7 +206,7 @@ class NamedTupleCursorTest(unittest.TestCase):
try:
if self.conn is not None:
self.conn.close()
- self.conn = psycopg2.connect(tests.dsn,
+ self.conn = psycopg2.connect(dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("select 1")
diff --git a/tests/test_async.py b/tests/test_async.py
index 96d7a2c..be4a1d8 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -8,12 +8,7 @@ import time
import select
import StringIO
-import sys
-if sys.version_info < (3,):
- import tests
-else:
- import py3tests as tests
-
+from testconfig import dsn
class PollableStub(object):
"""A 'pollable' wrapper allowing analysis of the `poll()` calls."""
@@ -33,8 +28,8 @@ class PollableStub(object):
class AsyncTests(unittest.TestCase):
def setUp(self):
- self.sync_conn = psycopg2.connect(tests.dsn)
- self.conn = psycopg2.connect(tests.dsn, async=True)
+ self.sync_conn = psycopg2.connect(dsn)
+ self.conn = psycopg2.connect(dsn, async=True)
self.wait(self.conn)
@@ -309,7 +304,7 @@ class AsyncTests(unittest.TestCase):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
- conn = psycopg2.connect(tests.dsn, connection_factory=MyConn, async=True)
+ conn = psycopg2.connect(dsn, connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
conn.close()
diff --git a/tests/test_cancel.py b/tests/test_cancel.py
index 52383f8..8e651ff 100644
--- a/tests/test_cancel.py
+++ b/tests/test_cancel.py
@@ -2,18 +2,18 @@
import time
import threading
-from testutils import unittest, skip_if_no_pg_sleep
-import tests
import psycopg2
import psycopg2.extensions
from psycopg2 import extras
+from testconfig import dsn
+from testutils import unittest, skip_if_no_pg_sleep
class CancelTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
@@ -65,7 +65,7 @@ class CancelTests(unittest.TestCase):
@skip_if_no_pg_sleep('conn')
def test_async_cancel(self):
- async_conn = psycopg2.connect(tests.dsn, async=True)
+ async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
@@ -79,7 +79,7 @@ class CancelTests(unittest.TestCase):
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
- async_conn = psycopg2.connect(tests.dsn, async=True)
+ async_conn = psycopg2.connect(dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
diff --git a/tests/test_connection.py b/tests/test_connection.py
index de775e0..1b34b6c 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -7,12 +7,12 @@ from operator import attrgetter
import psycopg2
import psycopg2.extensions
-import tests
+from testconfig import dsn, dbname
class ConnectionTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
if not self.conn.closed:
@@ -95,7 +95,7 @@ class ConnectionTests(unittest.TestCase):
@skip_if_no_pg_sleep('conn')
def test_concurrent_execution(self):
def slave():
- cnn = psycopg2.connect(tests.dsn)
+ cnn = psycopg2.connect(dsn)
cur = cnn.cursor()
cur.execute("select pg_sleep(2)")
cur.close()
@@ -141,7 +141,7 @@ class IsolationLevelsTestCase(unittest.TestCase):
conn.close()
def connect(self):
- conn = psycopg2.connect(tests.dsn)
+ conn = psycopg2.connect(dsn)
self._conns.append(conn)
return conn
@@ -333,7 +333,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
try:
cur.execute(
"select gid from pg_prepared_xacts where database = %s",
- (tests.dbname,))
+ (dbname,))
except psycopg2.ProgrammingError:
cnn.rollback()
cnn.close()
@@ -362,7 +362,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cur.execute("""
select count(*) from pg_prepared_xacts
where database = %s;""",
- (tests.dbname,))
+ (dbname,))
rv = cur.fetchone()[0]
cnn.close()
return rv
@@ -377,7 +377,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
return rv
def connect(self):
- conn = psycopg2.connect(tests.dsn)
+ conn = psycopg2.connect(dsn)
self._conns.append(conn)
return conn
@@ -540,13 +540,13 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
select gid, prepared, owner, database
from pg_prepared_xacts
where database = %s;""",
- (tests.dbname,))
+ (dbname,))
okvals = cur.fetchall()
okvals.sort()
cnn = self.connect()
xids = cnn.tpc_recover()
- xids = [ xid for xid in xids if xid.database == tests.dbname ]
+ xids = [ xid for xid in xids if xid.database == dbname ]
xids.sort(key=attrgetter('gtrid'))
# check the values returned
@@ -566,7 +566,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn = self.connect()
cur = cnn.cursor()
cur.execute("select gid from pg_prepared_xacts where database = %s;",
- (tests.dbname,))
+ (dbname,))
self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0])
def test_xid_roundtrip(self):
@@ -583,7 +583,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn = self.connect()
xids = [ xid for xid in cnn.tpc_recover()
- if xid.database == tests.dbname ]
+ if xid.database == dbname ]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, fid)
@@ -605,7 +605,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn = self.connect()
xids = [ xid for xid in cnn.tpc_recover()
- if xid.database == tests.dbname ]
+ if xid.database == dbname ]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, None)
@@ -651,7 +651,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.tpc_prepare()
cnn.reset()
xid = [ xid for xid in cnn.tpc_recover()
- if xid.database == tests.dbname ][0]
+ if xid.database == dbname ][0]
self.assertEqual(10, xid.format_id)
self.assertEqual('uni', xid.gtrid)
self.assertEqual('code', xid.bqual)
@@ -667,7 +667,7 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.reset()
xid = [ xid for xid in cnn.tpc_recover()
- if xid.database == tests.dbname ][0]
+ if xid.database == dbname ][0]
self.assertEqual(None, xid.format_id)
self.assertEqual('transaction-id', xid.gtrid)
self.assertEqual(None, xid.bqual)
diff --git a/tests/test_copy.py b/tests/test_copy.py
index b0d9965..2f09d5a 100644
--- a/tests/test_copy.py
+++ b/tests/test_copy.py
@@ -7,11 +7,11 @@ from itertools import cycle, izip
import psycopg2
import psycopg2.extensions
-import tests
+from testconfig import dsn, green
def skip_if_green(f):
def skip_if_green_(self):
- if tests.green:
+ if green:
return self.skipTest("copy in async mode currently not supported")
else:
return f(self)
@@ -42,7 +42,7 @@ class MinimalWrite(object):
class CopyTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE tcopy (
diff --git a/tests/test_cursor.py b/tests/test_cursor.py
index aedb5f5..d30a50e 100644
--- a/tests/test_cursor.py
+++ b/tests/test_cursor.py
@@ -3,12 +3,12 @@
import unittest
import psycopg2
import psycopg2.extensions
-import tests
+from testconfig import dsn
class CursorTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
diff --git a/tests/test_dates.py b/tests/test_dates.py
index 9958913..44b1bc5 100644
--- a/tests/test_dates.py
+++ b/tests/test_dates.py
@@ -2,9 +2,9 @@
import math
import unittest
-import tests
import psycopg2
from psycopg2.tz import FixedOffsetTimezone
+from testconfig import dsn
class CommonDatetimeTestsMixin:
@@ -75,7 +75,7 @@ class DatetimeTests(unittest.TestCase, CommonDatetimeTestsMixin):
"""Tests for the datetime based date handling in psycopg2."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
self.curs = self.conn.cursor()
self.DATE = psycopg2._psycopg.PYDATE
self.TIME = psycopg2._psycopg.PYTIME
@@ -293,7 +293,7 @@ class mxDateTimeTests(unittest.TestCase, CommonDatetimeTestsMixin):
"""Tests for the mx.DateTime based date handling in psycopg2."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
self.curs = self.conn.cursor()
self.DATE = psycopg2._psycopg.MXDATE
self.TIME = psycopg2._psycopg.MXTIME
diff --git a/tests/test_green.py b/tests/test_green.py
index 07ecdb7..bb9acf2 100644
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -4,7 +4,7 @@ import unittest
import psycopg2
import psycopg2.extensions
import psycopg2.extras
-import tests
+from testconfig import dsn
class ConnectionStub(object):
"""A `connection` wrapper allowing analysis of the `poll()` calls."""
@@ -24,7 +24,7 @@ class GreenTests(unittest.TestCase):
def setUp(self):
self._cb = psycopg2.extensions.get_wait_callback()
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
diff --git a/tests/test_lobject.py b/tests/test_lobject.py
index 88e18d6..7c96a6e 100644
--- a/tests/test_lobject.py
+++ b/tests/test_lobject.py
@@ -2,11 +2,11 @@
import os
import shutil
import tempfile
-from testutils import unittest, decorate_all_tests
import psycopg2
import psycopg2.extensions
-import tests
+from testconfig import dsn, green
+from testutils import unittest, decorate_all_tests
def skip_if_no_lo(f):
def skip_if_no_lo_(self):
@@ -19,7 +19,7 @@ def skip_if_no_lo(f):
def skip_if_green(f):
def skip_if_green_(self):
- if tests.green:
+ if green:
return self.skipTest("libpq doesn't support LO in async mode")
else:
return f(self)
@@ -30,7 +30,7 @@ def skip_if_green(f):
class LargeObjectMixin(object):
# doesn't derive from TestCase to avoid repeating tests twice.
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
self.lo_oid = None
self.tmpdir = None
diff --git a/tests/test_notify.py b/tests/test_notify.py
index bc14f37..a15aad6 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -3,23 +3,19 @@ from testutils import unittest
import psycopg2
from psycopg2 import extensions
+from testconfig import dsn
+import sys
import time
import select
import signal
from subprocess import Popen, PIPE
-import sys
-if sys.version_info < (3,):
- import tests
-else:
- import py3tests as tests
-
class NotifiesTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
@@ -54,7 +50,7 @@ curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
"""
- % { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload})
+ % { 'dsn': dsn, 'sec': sec, 'name': name, 'payload': payload})
return Popen([sys.executable, '-c', script], stdout=PIPE)
diff --git a/tests/test_psycopg2_dbapi20.py b/tests/test_psycopg2_dbapi20.py
index 93fdfa6..dfe8f53 100755
--- a/tests/test_psycopg2_dbapi20.py
+++ b/tests/test_psycopg2_dbapi20.py
@@ -5,12 +5,12 @@ from test_connection import skip_if_tpc_disabled
from testutils import unittest, decorate_all_tests
import psycopg2
-import tests
+from testconfig import dsn
class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
driver = psycopg2
connect_args = ()
- connect_kw_args = {'dsn': tests.dsn}
+ connect_kw_args = {'dsn': dsn}
lower_func = 'lower' # For stored procedure test
@@ -27,7 +27,7 @@ class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests, unittest.TestCase):
driver = psycopg2
def connect(self):
- return psycopg2.connect(dsn=tests.dsn)
+ return psycopg2.connect(dsn=dsn)
decorate_all_tests(Psycopg2TPCTests, skip_if_tpc_disabled)
diff --git a/tests/test_quote.py b/tests/test_quote.py
index 95c5d7a..83662c7 100755
--- a/tests/test_quote.py
+++ b/tests/test_quote.py
@@ -3,7 +3,7 @@ from testutils import unittest
import psycopg2
import psycopg2.extensions
-import tests
+from testconfig import dsn
class QuotingTestCase(unittest.TestCase):
r"""Checks the correct quoting of strings and binary objects.
@@ -24,7 +24,7 @@ class QuotingTestCase(unittest.TestCase):
http://www.postgresql.org/docs/8.1/static/runtime-config-compatible.html
"""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 5bbed38..53ddf85 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -5,13 +5,12 @@ from testutils import unittest, skip_if_no_pg_sleep
import psycopg2
from psycopg2.extensions import (
ISOLATION_LEVEL_SERIALIZABLE, STATUS_BEGIN, STATUS_READY)
-import tests
-
+from testconfig import dsn
class TransactionTests(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
curs = self.conn.cursor()
curs.execute('''
@@ -75,7 +74,7 @@ class DeadlockSerializationTests(unittest.TestCase):
"""Test deadlock and serialization failure errors."""
def connect(self):
- conn = psycopg2.connect(tests.dsn)
+ conn = psycopg2.connect(dsn)
conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
return conn
@@ -208,7 +207,7 @@ class QueryCancellationTests(unittest.TestCase):
"""Tests for query cancellation."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
def tearDown(self):
diff --git a/tests/testconfig.py b/tests/testconfig.py
new file mode 100644
index 0000000..b2730e0
--- /dev/null
+++ b/tests/testconfig.py
@@ -0,0 +1,33 @@
+# Configure the test suite from the env variables.
+
+import os
+
+dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
+dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
+dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None)
+dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None)
+
+# Check if we want to test psycopg's green path.
+green = os.environ.get('PSYCOPG2_TEST_GREEN', None)
+if green:
+ if green == '1':
+ from psycopg2.extras import wait_select as wait_callback
+ elif green == 'eventlet':
+ from eventlet.support.psycopg2_patcher import eventlet_wait_callback \
+ as wait_callback
+ else:
+ raise ValueError("please set 'PSYCOPG2_TEST_GREEN' to a valid value")
+
+ import psycopg2.extensions
+ psycopg2.extensions.set_wait_callback(wait_callback)
+
+# Construct a DSN to connect to the test database:
+dsn = 'dbname=%s' % dbname
+if dbhost is not None:
+ dsn += ' host=%s' % dbhost
+if dbport is not None:
+ dsn += ' port=%s' % dbport
+if dbuser is not None:
+ dsn += ' user=%s' % dbuser
+
+
diff --git a/tests/types_basic.py b/tests/types_basic.py
index 5bcff06..f14acc5 100755
--- a/tests/types_basic.py
+++ b/tests/types_basic.py
@@ -30,14 +30,14 @@ import sys
from testutils import unittest
import psycopg2
-import tests
+from testconfig import dsn
class TypesBasicTests(unittest.TestCase):
"""Test that all type conversions are working."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
diff --git a/tests/types_extras.py b/tests/types_extras.py
index 6f77d97..9a2a2c2 100644
--- a/tests/types_extras.py
+++ b/tests/types_extras.py
@@ -24,7 +24,7 @@ from testutils import unittest
import psycopg2
import psycopg2.extras
-import tests
+from testconfig import dsn
def skip_if_no_uuid(f):
@@ -58,7 +58,7 @@ class TypesExtrasTests(unittest.TestCase):
"""Test that all type conversions are working."""
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
@@ -145,7 +145,7 @@ def skip_if_no_hstore(f):
class HstoreTestCase(unittest.TestCase):
def setUp(self):
- self.conn = psycopg2.connect(tests.dsn)
+ self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()