diff options
Diffstat (limited to 'tests/test_connection.py')
-rwxr-xr-x | tests/test_connection.py | 146 |
1 files changed, 145 insertions, 1 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py index 89b104f..18c9277 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -25,7 +25,8 @@ import os import time import threading -from testutils import unittest, decorate_all_tests, skip_before_postgres +from testutils import unittest, decorate_all_tests +from testutils import skip_before_postgres, skip_after_postgres from operator import attrgetter import psycopg2 @@ -707,6 +708,149 @@ from testutils import skip_if_tpc_disabled decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled) +class TransactionControlTests(unittest.TestCase): + def setUp(self): + self.conn = psycopg2.connect(dsn) + + def tearDown(self): + if not self.conn.closed: + self.conn.close() + + def test_not_in_transaction(self): + cur = self.conn.cursor() + cur.execute("select 1") + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_transaction, + psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + + def test_set_isolation_level(self): + cur = self.conn.cursor() + self.conn.set_transaction( + psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction( + psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction( + isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.set_transaction( + isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED) + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + def test_set_isolation_level_str(self): + cur = self.conn.cursor() + self.conn.set_transaction("serializable") + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction("repeatable read") + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction("read committed") + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.set_transaction("read uncommitted") + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + def test_bad_isolation_level(self): + self.assertRaises(ValueError, self.conn.set_transaction, 0) + self.assertRaises(ValueError, self.conn.set_transaction, 5) + self.assertRaises(ValueError, self.conn.set_transaction, 'whatever') + + def test_set_read_only(self): + cur = self.conn.cursor() + self.conn.set_transaction(readonly=True) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + cur = self.conn.cursor() + self.conn.set_transaction(readonly=None) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + self.conn.set_transaction(readonly=False) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_set_default(self): + cur = self.conn.cursor() + cur.execute("SHOW default_transaction_isolation;") + default_isolevel = cur.fetchone()[0] + cur.execute("SHOW default_transaction_read_only;") + default_readonly = cur.fetchone()[0] + self.conn.rollback() + + self.conn.set_transaction(isolation_level='serializable', readonly=True) + self.conn.set_transaction(isolation_level='default', readonly='default') + + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], default_isolevel) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], default_readonly) + + @skip_before_postgres(9, 1) + def test_set_deferrable(self): + cur = self.conn.cursor() + self.conn.set_transaction(readonly=True, deferrable=True) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + self.conn.set_transaction(deferrable=False) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + @skip_after_postgres(9, 1) + def test_set_deferrable_error(self): + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_transaction, readonly=True, deferrable=True) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |