summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rwxr-xr-xtests/test_transaction.py153
1 files changed, 152 insertions, 1 deletions
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 81fe54b..bd96ce4 100755
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python
-import psycopg2
+import threading
import unittest
+
+import psycopg2
+import psycopg2
import tests
from psycopg2.extensions import (
@@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase):
self.assertEqual(curs.fetchone()[0], 1)
+class DeadlockSerializationTestCase(unittest.TestCase):
+ """Test deadlock and serialization failure errors."""
+
+ def connect(self):
+ conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+ return conn
+
+ def setUp(self):
+ self.conn = self.connect()
+ curs = self.conn.cursor()
+ # Drop table if it already exists
+ try:
+ curs.execute("DROP TABLE table1")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ try:
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ except psycopg2.DatabaseError:
+ self.conn.rollback()
+ # Create sample data
+ curs.execute("""
+ CREATE TABLE table1 (
+ id int PRIMARY KEY,
+ name text)
+ """)
+ curs.execute("INSERT INTO table1 VALUES (1, 'hello')")
+ curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)")
+ self.conn.commit()
+
+ def tearDown(self):
+ curs = self.conn.cursor()
+ curs.execute("DROP TABLE table1")
+ curs.execute("DROP TABLE table2")
+ self.conn.commit()
+ self.conn.close()
+
+ def test_deadlock(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ step1.set()
+ step2.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
+ step2.set()
+ curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+ def test_serialisation_failure(self):
+ self.thread1_error = self.thread2_error = None
+ step1 = threading.Event()
+ step2 = threading.Event()
+
+ def task1():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("SELECT name FROM table1 WHERE id = 1")
+ curs.fetchall()
+ step1.set()
+ step2.wait()
+ curs.execute("UPDATE table1 SET name='task1' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread1_error = exc
+ step1.set()
+ conn.close()
+ def task2():
+ try:
+ conn = self.connect()
+ curs = conn.cursor()
+ step1.wait()
+ curs.execute("UPDATE table1 SET name='task2' WHERE id = 1")
+ conn.commit()
+ except psycopg2.DatabaseError, exc:
+ self.thread2_error = exc
+ step2.set()
+ conn.close()
+
+ # Run the threads in parallel. The "step1" and "step2" events
+ # ensure that the two transactions overlap.
+ thread1 = threading.Thread(target=task1)
+ thread2 = threading.Thread(target=task2)
+ thread1.start()
+ thread2.start()
+ thread1.join()
+ thread2.join()
+
+ # Exactly one of the threads should have failed with
+ # TransactionRollbackError:
+ self.assertFalse(self.thread1_error and self.thread2_error)
+ error = self.thread1_error or self.thread2_error
+ self.assertTrue(isinstance(
+ error, psycopg2.extensions.TransactionRollbackError))
+
+
+class QueryCancelationTests(unittest.TestCase):
+ """Tests for query cancelation."""
+
+ def setUp(self):
+ self.conn = psycopg2.connect("dbname=%s" % tests.dbname)
+ self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
+
+ def test_statement_timeout(self):
+ curs = self.conn.cursor()
+ # Set a low statement timeout, then sleep for a longer period.
+ curs.execute('SET statement_timeout TO 10')
+ self.assertRaises(psycopg2.extensions.QueryCanceledError,
+ curs.execute, 'SELECT pg_sleep(50)')
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)