diff options
| author | James Henstridge <james@jamesh.id.au> | 2008-01-16 05:14:24 +0000 |
|---|---|---|
| committer | James Henstridge <james@jamesh.id.au> | 2008-01-16 05:14:24 +0000 |
| commit | f64cbeda4606d8af3ca063736f313f81506c56bb (patch) | |
| tree | 0f0de0dddb735963bb348e2e6eae4371ecfe47b2 /tests/test_transaction.py | |
| parent | 46bf23caf47c3f08194e524312b5afbe0c449b70 (diff) | |
| download | psycopg2-f64cbeda4606d8af3ca063736f313f81506c56bb.tar.gz | |
* tests/test_transaction.py (DeadlockSerializationTestCase): port
over some tests for serialisation and deadlock errors,
demonstrating that TransactionRollbackError is generated.
(QueryCancelationTests): add a test to show that
QueryCanceledError is raised on statement timeouts.
* psycopg2da/adapter.py (_handle_psycopg_exception): rather than
checking exception messages, check for TransactionRollbackError.
* psycopg/pqpath.c (exception_from_sqlstate): return
TransactionRollbackError for 40xxx errors, and QueryCanceledError
for 57014 errors.
(pq_raise): If we are using an old server, use
TransactionRollbackError if the error message contains "could not
serialize" or "deadlock detected".
* psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove
function, since we no longer need to store pointers to the
exceptions in the connection. This also fixes a reference leak.
(psyco_connect): remove _psyco_connect_fill_exc() function call.
* psycopg/connection.h (connectionObject): remove exception
members from struct.
* psycopg/connection_type.c (connectionObject_getsets): modify the
exception attributes on the connection object from members to
getsets. This reduces the size of the struct.
* lib/extensions.py: import the two new extensions.
* psycopg/psycopgmodule.c (exctable): add new QueryCanceledError
and TransactionRollbackError exceptions.
Diffstat (limited to 'tests/test_transaction.py')
| -rwxr-xr-x | tests/test_transaction.py | 153 |
1 files changed, 152 insertions, 1 deletions
diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 81fe54b..bd96ce4 100755 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,6 +1,9 @@ #!/usr/bin/env python -import psycopg2 +import threading import unittest + +import psycopg2 +import psycopg2 import tests from psycopg2.extensions import ( @@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase): self.assertEqual(curs.fetchone()[0], 1) +class DeadlockSerializationTestCase(unittest.TestCase): + """Test deadlock and serialization failure errors.""" + + def connect(self): + conn = psycopg2.connect("dbname=%s" % tests.dbname) + conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + return conn + + def setUp(self): + self.conn = self.connect() + curs = self.conn.cursor() + # Drop table if it already exists + try: + curs.execute("DROP TABLE table1") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + try: + curs.execute("DROP TABLE table2") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + # Create sample data + curs.execute(""" + CREATE TABLE table1 ( + id int PRIMARY KEY, + name text) + """) + curs.execute("INSERT INTO table1 VALUES (1, 'hello')") + curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)") + self.conn.commit() + + def tearDown(self): + curs = self.conn.cursor() + curs.execute("DROP TABLE table1") + curs.execute("DROP TABLE table2") + self.conn.commit() + self.conn.close() + + def test_deadlock(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + step1.set() + step2.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + step2.set() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + def test_serialisation_failure(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("SELECT name FROM table1 WHERE id = 1") + curs.fetchall() + step1.set() + step2.wait() + curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + +class QueryCancelationTests(unittest.TestCase): + """Tests for query cancelation.""" + + def setUp(self): + self.conn = psycopg2.connect("dbname=%s" % tests.dbname) + self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + + def test_statement_timeout(self): + curs = self.conn.cursor() + # Set a low statement timeout, then sleep for a longer period. + curs.execute('SET statement_timeout TO 10') + self.assertRaises(psycopg2.extensions.QueryCanceledError, + curs.execute, 'SELECT pg_sleep(50)') + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
