summaryrefslogtreecommitdiff
path: root/tests/test_connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_connection.py')
-rwxr-xr-xtests/test_connection.py146
1 files changed, 145 insertions, 1 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 89b104f..18c9277 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -25,7 +25,8 @@
import os
import time
import threading
-from testutils import unittest, decorate_all_tests, skip_before_postgres
+from testutils import unittest, decorate_all_tests
+from testutils import skip_before_postgres, skip_after_postgres
from operator import attrgetter
import psycopg2
@@ -707,6 +708,149 @@ from testutils import skip_if_tpc_disabled
decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)
+class TransactionControlTests(unittest.TestCase):
+ def setUp(self):
+ self.conn = psycopg2.connect(dsn)
+
+ def tearDown(self):
+ if not self.conn.closed:
+ self.conn.close()
+
+ def test_not_in_transaction(self):
+ cur = self.conn.cursor()
+ cur.execute("select 1")
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_transaction,
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+
+ def test_set_isolation_level(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED)
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ def test_set_isolation_level_str(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction("serializable")
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction("repeatable read")
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction("read committed")
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.set_transaction("read uncommitted")
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ def test_bad_isolation_level(self):
+ self.assertRaises(ValueError, self.conn.set_transaction, 0)
+ self.assertRaises(ValueError, self.conn.set_transaction, 5)
+ self.assertRaises(ValueError, self.conn.set_transaction, 'whatever')
+
+ def test_set_read_only(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=True)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=None)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ self.conn.set_transaction(readonly=False)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_set_default(self):
+ cur = self.conn.cursor()
+ cur.execute("SHOW default_transaction_isolation;")
+ default_isolevel = cur.fetchone()[0]
+ cur.execute("SHOW default_transaction_read_only;")
+ default_readonly = cur.fetchone()[0]
+ self.conn.rollback()
+
+ self.conn.set_transaction(isolation_level='serializable', readonly=True)
+ self.conn.set_transaction(isolation_level='default', readonly='default')
+
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], default_isolevel)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], default_readonly)
+
+ @skip_before_postgres(9, 1)
+ def test_set_deferrable(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=True, deferrable=True)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ self.conn.set_transaction(deferrable=False)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ @skip_after_postgres(9, 1)
+ def test_set_deferrable_error(self):
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_transaction, readonly=True, deferrable=True)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)