summaryrefslogtreecommitdiff
path: root/tests/test_module.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_module.py')
-rwxr-xr-xtests/test_module.py128
1 files changed, 119 insertions, 9 deletions
diff --git a/tests/test_module.py b/tests/test_module.py
index 4083c36..e6d1a3d 100755
--- a/tests/test_module.py
+++ b/tests/test_module.py
@@ -22,8 +22,8 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
-from testutils import unittest, skip_before_python
-from testconfig import dsn
+from testutils import unittest, skip_before_python, skip_before_postgres
+from testutils import ConnectingTestCase, skip_copy_if_green
import psycopg2
@@ -136,13 +136,7 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect, 'dbname=foo', no_such_param='meh')
-class ExceptionsTestCase(unittest.TestCase):
- def setUp(self):
- self.conn = psycopg2.connect(dsn)
-
- def tearDown(self):
- self.conn.close()
-
+class ExceptionsTestCase(ConnectingTestCase):
def test_attributes(self):
cur = self.conn.cursor()
try:
@@ -154,6 +148,122 @@ class ExceptionsTestCase(unittest.TestCase):
self.assert_(e.pgerror)
self.assert_(e.cursor is cur)
+ def test_diagnostics_attributes(self):
+ cur = self.conn.cursor()
+ try:
+ cur.execute("select * from nonexist")
+ except psycopg2.Error, exc:
+ e = exc
+
+ diag = e.diag
+ self.assert_(isinstance(diag, psycopg2.extensions.Diagnostics))
+ for attr in [
+ 'column_name', 'constraint_name', 'context', 'datatype_name',
+ 'internal_position', 'internal_query', 'message_detail',
+ 'message_hint', 'message_primary', 'schema_name', 'severity',
+ 'source_file', 'source_function', 'source_line', 'sqlstate',
+ 'statement_position', 'table_name', ]:
+ v = getattr(diag, attr)
+ if v is not None:
+ self.assert_(isinstance(v, str))
+
+ def test_diagnostics_values(self):
+ cur = self.conn.cursor()
+ try:
+ cur.execute("select * from nonexist")
+ except psycopg2.Error, exc:
+ e = exc
+
+ self.assertEqual(e.diag.sqlstate, '42P01')
+ self.assertEqual(e.diag.severity, 'ERROR')
+
+ def test_diagnostics_life(self):
+ import gc
+ from weakref import ref
+
+ def tmp():
+ cur = self.conn.cursor()
+ try:
+ cur.execute("select * from nonexist")
+ except psycopg2.Error, exc:
+ return cur, exc
+
+ cur, e = tmp()
+ diag = e.diag
+ w = ref(cur)
+
+ del e, cur
+ gc.collect()
+ assert(w() is not None)
+
+ self.assertEqual(diag.sqlstate, '42P01')
+
+ del diag
+ gc.collect()
+ assert(w() is None)
+
+ @skip_copy_if_green
+ def test_diagnostics_copy(self):
+ from StringIO import StringIO
+ f = StringIO()
+ cur = self.conn.cursor()
+ try:
+ cur.copy_to(f, 'nonexist')
+ except psycopg2.Error, exc:
+ diag = exc.diag
+
+ self.assertEqual(diag.sqlstate, '42P01')
+
+ def test_diagnostics_independent(self):
+ cur = self.conn.cursor()
+ try:
+ cur.execute("l'acqua e' poca e 'a papera nun galleggia")
+ except Exception, exc:
+ diag1 = exc.diag
+
+ self.conn.rollback()
+
+ try:
+ cur.execute("select level from water where ducks > 1")
+ except psycopg2.Error, exc:
+ diag2 = exc.diag
+
+ self.assertEqual(diag1.sqlstate, '42601')
+ self.assertEqual(diag2.sqlstate, '42P01')
+
+ def test_diagnostics_from_commit(self):
+ cur = self.conn.cursor()
+ cur.execute("""
+ create temp table test_deferred (
+ data int primary key,
+ ref int references test_deferred (data)
+ deferrable initially deferred)
+ """)
+ cur.execute("insert into test_deferred values (1,2)")
+ try:
+ self.conn.commit()
+ except psycopg2.Error, exc:
+ e = exc
+ self.assertEqual(e.diag.sqlstate, '23503')
+
+ @skip_before_postgres(9, 3)
+ def test_9_3_diagnostics(self):
+ cur = self.conn.cursor()
+ cur.execute("""
+ create temp table test_exc (
+ data int constraint chk_eq1 check (data = 1)
+ )""")
+ try:
+ cur.execute("insert into test_exc values(2)")
+ except psycopg2.Error, exc:
+ e = exc
+ self.assertEqual(e.pgcode, '23514')
+ self.assertEqual(e.diag.schema_name[:7], "pg_temp")
+ self.assertEqual(e.diag.table_name, "test_exc")
+ self.assertEqual(e.diag.column_name, None)
+ self.assertEqual(e.diag.constraint_name, "chk_eq1")
+ self.assertEqual(e.diag.datatype_name, None)
+
@skip_before_python(2, 5)
def test_pickle(self):
import pickle