summaryrefslogtreecommitdiff
path: root/tests/test_transaction.py
diff options
context:
space:
mode:
authorJames Henstridge <james@jamesh.id.au>2008-01-16 05:14:24 +0000
committerJames Henstridge <james@jamesh.id.au>2008-01-16 05:14:24 +0000
commitf64cbeda4606d8af3ca063736f313f81506c56bb (patch)
tree0f0de0dddb735963bb348e2e6eae4371ecfe47b2 /tests/test_transaction.py
parent46bf23caf47c3f08194e524312b5afbe0c449b70 (diff)
downloadpsycopg2-f64cbeda4606d8af3ca063736f313f81506c56bb.tar.gz
* tests/test_transaction.py (DeadlockSerializationTestCase): port
over some tests for serialisation and deadlock errors, demonstrating that TransactionRollbackError is generated. (QueryCancelationTests): add a test to show that QueryCanceledError is raised on statement timeouts. * psycopg2da/adapter.py (_handle_psycopg_exception): rather than checking exception messages, check for TransactionRollbackError. * psycopg/pqpath.c (exception_from_sqlstate): return TransactionRollbackError for 40xxx errors, and QueryCanceledError for 57014 errors. (pq_raise): If we are using an old server, use TransactionRollbackError if the error message contains "could not serialize" or "deadlock detected". * psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove function, since we no longer need to store pointers to the exceptions in the connection. This also fixes a reference leak. (psyco_connect): remove _psyco_connect_fill_exc() function call. * psycopg/connection.h (connectionObject): remove exception members from struct. * psycopg/connection_type.c (connectionObject_getsets): modify the exception attributes on the connection object from members to getsets. This reduces the size of the struct. * lib/extensions.py: import the two new extensions. * psycopg/psycopgmodule.c (exctable): add new QueryCanceledError and TransactionRollbackError exceptions.
Diffstat (limited to 'tests/test_transaction.py')
-rwxr-xr-xtests/test_transaction.py153
1 files changed, 152 insertions, 1 deletions
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 81fe54b..bd96ce4 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python
-import psycopg2
+import threading
import unittest
+
+import psycopg2
+import psycopg2
import tests
from psycopg2.extensions import (
@@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase):
self.assertEqual(curs.fetchone()[0], 1)
+class DeadlockSerializationTestCase(unittest.TestCase):
+ """Test deadlock and serialization failure errors."""
+
+ def connect(self):
+ conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+ return conn
+
+ def setUp(self):
+ self.conn = self.connect()
+ curs = self.conn.cursor()
+ # Drop table if it already exists
+ try:
+ curs.execute("DROP TABLE table1")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ try:
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ # Create sample data
+ curs.execute("""
+ CREATE TABLE table1 (
+ id int PRIMARY KEY,
+ name text)
+ """)
+ curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
+ curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
+ self.conn.commit()
+
+ def tearDown(self):
+ curs = self.conn.cursor()
+ curs.execute("DROP TABLE table1")
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ self.conn.close()
+
+ def test_deadlock(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ step1.set()
+ step2.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ step2.set()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+ def test_serialisation_failure(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("SELECT name FROM table1 WHERE id = 1")
+ curs.fetchall()
+ step1.set()
+ step2.wait()
+ curs.execute("UPDATE table1 SET name='task1' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("UPDATE table1 SET name='task2' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+
+class QueryCancelationTests(unittest.TestCase):
+ """Tests for query cancelation."""
+
+ def setUp(self):
+ self.conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+
+ def test_statement_timeout(self):
+ curs = self.conn.cursor()
+ # Set a low statement timeout, then sleep for a longer period.
+ curs.execute('SET statement_timeout TO 10')
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ curs.execute, 'SELECT pg_sleep(50)')
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)