diff options
Diffstat (limited to 'tests/test_transaction.py')
| -rwxr-xr-x | tests/test_transaction.py | 153 |
1 files changed, 152 insertions, 1 deletions
diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 81fe54b..bd96ce4 100755 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,6 +1,9 @@ #!/usr/bin/env python -import psycopg2 +import threading import unittest + +import psycopg2 +import psycopg2 import tests from psycopg2.extensions import ( @@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase): self.assertEqual(curs.fetchone()[0], 1) +class DeadlockSerializationTestCase(unittest.TestCase): + """Test deadlock and serialization failure errors.""" + + def connect(self): + conn = psycopg2.connect("dbname=%s" % tests.dbname) + conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + return conn + + def setUp(self): + self.conn = self.connect() + curs = self.conn.cursor() + # Drop table if it already exists + try: + curs.execute("DROP TABLE table1") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + try: + curs.execute("DROP TABLE table2") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + # Create sample data + curs.execute(""" + CREATE TABLE table1 ( + id int PRIMARY KEY, + name text) + """) + curs.execute("INSERT INTO table1 VALUES (1, 'hello')") + curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)") + self.conn.commit() + + def tearDown(self): + curs = self.conn.cursor() + curs.execute("DROP TABLE table1") + curs.execute("DROP TABLE table2") + self.conn.commit() + self.conn.close() + + def test_deadlock(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + step1.set() + step2.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + step2.set() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + def test_serialisation_failure(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("SELECT name FROM table1 WHERE id = 1") + curs.fetchall() + step1.set() + step2.wait() + curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + +class QueryCancelationTests(unittest.TestCase): + """Tests for query cancelation.""" + + def setUp(self): + self.conn = psycopg2.connect("dbname=%s" % tests.dbname) + self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + + def test_statement_timeout(self): + curs = self.conn.cursor() + # Set a low statement timeout, then sleep for a longer period. + curs.execute('SET statement_timeout TO 10') + self.assertRaises(psycopg2.extensions.QueryCanceledError, + curs.execute, 'SELECT pg_sleep(50)') + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
