diff options
| author | James Henstridge <james@jamesh.id.au> | 2008-01-16 05:14:24 +0000 | 
|---|---|---|
| committer | James Henstridge <james@jamesh.id.au> | 2008-01-16 05:14:24 +0000 | 
| commit | f64cbeda4606d8af3ca063736f313f81506c56bb (patch) | |
| tree | 0f0de0dddb735963bb348e2e6eae4371ecfe47b2 /tests/test_transaction.py | |
| parent | 46bf23caf47c3f08194e524312b5afbe0c449b70 (diff) | |
| download | psycopg2-f64cbeda4606d8af3ca063736f313f81506c56bb.tar.gz | |
	* tests/test_transaction.py (DeadlockSerializationTestCase): port
	over some tests for serialisation and deadlock errors,
	demonstrating that TransactionRollbackError is generated.
	(QueryCancelationTests): add a test to show that
	QueryCanceledError is raised on statement timeouts.
	* psycopg2da/adapter.py (_handle_psycopg_exception): rather than
	checking exception messages, check for TransactionRollbackError.
	* psycopg/pqpath.c (exception_from_sqlstate): return
	TransactionRollbackError for 40xxx errors, and QueryCanceledError
	for 57014 errors.
	(pq_raise): If we are using an old server, use
	TransactionRollbackError if the error message contains "could not
	serialize" or "deadlock detected".
	* psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove
	function, since we no longer need to store pointers to the
	exceptions in the connection.  This also fixes a reference leak.
	(psyco_connect): remove _psyco_connect_fill_exc() function call.
	* psycopg/connection.h (connectionObject): remove exception
	members from struct.
	* psycopg/connection_type.c (connectionObject_getsets): modify the
	exception attributes on the connection object from members to
	getsets.  This reduces the size of the struct.
	* lib/extensions.py: import the two new extensions.
	* psycopg/psycopgmodule.c (exctable): add new QueryCanceledError
	and TransactionRollbackError exceptions.
Diffstat (limited to 'tests/test_transaction.py')
| -rwxr-xr-x | tests/test_transaction.py | 153 | 
1 files changed, 152 insertions, 1 deletions
| diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 81fe54b..bd96ce4 100755 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,6 +1,9 @@  #!/usr/bin/env python -import psycopg2 +import threading  import unittest + +import psycopg2 +import psycopg2  import tests  from psycopg2.extensions import ( @@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase):          self.assertEqual(curs.fetchone()[0], 1) +class DeadlockSerializationTestCase(unittest.TestCase): +    """Test deadlock and serialization failure errors.""" + +    def connect(self): +        conn = psycopg2.connect("dbname=%s" % tests.dbname) +        conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) +        return conn + +    def setUp(self): +        self.conn = self.connect() +        curs = self.conn.cursor() +        # Drop table if it already exists +        try: +            curs.execute("DROP TABLE table1") +            self.conn.commit() +        except psycopg2.DatabaseError: +            self.conn.rollback() +        try: +            curs.execute("DROP TABLE table2") +            self.conn.commit() +        except psycopg2.DatabaseError: +            self.conn.rollback() +        # Create sample data +        curs.execute(""" +            CREATE TABLE table1 ( +                id int PRIMARY KEY, +                name text) +        """) +        curs.execute("INSERT INTO table1 VALUES (1, 'hello')") +        curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)") +        self.conn.commit() + +    def tearDown(self): +        curs = self.conn.cursor() +        curs.execute("DROP TABLE table1") +        curs.execute("DROP TABLE table2") +        self.conn.commit() +        self.conn.close() + +    def test_deadlock(self): +        self.thread1_error = self.thread2_error = None +        step1 = threading.Event() +        step2 = threading.Event() + +        def task1(): +            try: +                conn = self.connect() +                curs = conn.cursor() +                curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") +                step1.set() +                step2.wait() +                curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") +            except psycopg2.DatabaseError, exc: +                self.thread1_error = exc +                step1.set() +            conn.close() +        def task2(): +            try: +                conn = self.connect() +                curs = conn.cursor() +                step1.wait() +                curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") +                step2.set() +                curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") +            except psycopg2.DatabaseError, exc: +                self.thread2_error = exc +                step2.set() +            conn.close() + +        # Run the threads in parallel.  The "step1" and "step2" events +        # ensure that the two transactions overlap. +        thread1 = threading.Thread(target=task1) +        thread2 = threading.Thread(target=task2) +        thread1.start() +        thread2.start() +        thread1.join() +        thread2.join() + +        # Exactly one of the threads should have failed with +        # TransactionRollbackError: +        self.assertFalse(self.thread1_error and self.thread2_error) +        error = self.thread1_error or self.thread2_error +        self.assertTrue(isinstance( +                error, psycopg2.extensions.TransactionRollbackError)) + +    def test_serialisation_failure(self): +        self.thread1_error = self.thread2_error = None +        step1 = threading.Event() +        step2 = threading.Event() + +        def task1(): +            try: +                conn = self.connect() +                curs = conn.cursor() +                curs.execute("SELECT name FROM table1 WHERE id = 1") +                curs.fetchall() +                step1.set() +                step2.wait() +                curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") +                conn.commit() +            except psycopg2.DatabaseError, exc: +                self.thread1_error = exc +                step1.set() +            conn.close() +        def task2(): +            try: +                conn = self.connect() +                curs = conn.cursor() +                step1.wait() +                curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") +                conn.commit() +            except psycopg2.DatabaseError, exc: +                self.thread2_error = exc +            step2.set() +            conn.close() + +        # Run the threads in parallel.  The "step1" and "step2" events +        # ensure that the two transactions overlap. +        thread1 = threading.Thread(target=task1) +        thread2 = threading.Thread(target=task2) +        thread1.start() +        thread2.start() +        thread1.join() +        thread2.join() + +        # Exactly one of the threads should have failed with +        # TransactionRollbackError: +        self.assertFalse(self.thread1_error and self.thread2_error) +        error = self.thread1_error or self.thread2_error +        self.assertTrue(isinstance( +                error, psycopg2.extensions.TransactionRollbackError)) + + +class QueryCancelationTests(unittest.TestCase): +    """Tests for query cancelation.""" + +    def setUp(self): +        self.conn = psycopg2.connect("dbname=%s" % tests.dbname) +        self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + +    def test_statement_timeout(self): +        curs = self.conn.cursor() +        # Set a low statement timeout, then sleep for a longer period. +        curs.execute('SET statement_timeout TO 10') +        self.assertRaises(psycopg2.extensions.QueryCanceledError, +                          curs.execute, 'SELECT pg_sleep(50)') + +  def test_suite():      return unittest.TestLoader().loadTestsFromName(__name__) | 
