diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-10-10 19:15:33 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-11-05 09:34:48 +0000 |
| commit | 397eace051aaf633a8facbe92d04e2194c908e7d (patch) | |
| tree | a6e9c9f2aa779e7782d0ce25c93340af7d9c916a | |
| parent | 56c02b0f94c3922f7d998b8abf8616d1ee8160c9 (diff) | |
| download | psycopg2-397eace051aaf633a8facbe92d04e2194c908e7d.tar.gz | |
Added tests for two-phase commit/rollback.
| -rw-r--r-- | tests/test_connection.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/tests/test_connection.py b/tests/test_connection.py index 6c6dddc..f00a4c9 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -69,6 +69,7 @@ class ConnectionTests(unittest.TestCase): class ConnectionTwoPhaseTests(unittest.TestCase): def setUp(self): + self.make_test_table() self.clear_test_xacts() def tearDown(self): @@ -87,9 +88,172 @@ class ConnectionTwoPhaseTests(unittest.TestCase): cur.execute("rollback prepared %s;", (gid,)) cnn.close() + def make_test_table(self): + cnn = self.connect() + cur = cnn.cursor() + cur.execute("DROP TABLE IF EXISTS test_tpc;") + cur.execute("CREATE TABLE test_tpc (data text);") + cnn.commit() + cnn.close() + + def count_xacts(self): + """Return the number of prepared xacts currently in the test db.""" + cnn = self.connect() + cur = cnn.cursor() + cur.execute(""" + select count(*) from pg_prepared_xacts + where database = %s;""", + (tests.dbname,)) + rv = cur.fetchone()[0] + cnn.close() + return rv + + def count_test_records(self): + """Return the number of records in the test table.""" + cnn = self.connect() + cur = cnn.cursor() + cur.execute("select count(*) from test_tpc;") + rv = cur.fetchone()[0] + cnn.close() + return rv + def connect(self): return psycopg2.connect(tests.dsn) + def test_tpc_commit(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_commit');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_prepare() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) + self.assertEqual(1, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_commit() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(1, self.count_test_records()) + + def test_tpc_commit_one_phase(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_commit() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(1, self.count_test_records()) + + def test_tpc_commit_recovered(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_prepare() + cnn.close() + self.assertEqual(1, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + cnn.tpc_commit(xid) + + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(1, self.count_test_records()) + + def test_tpc_rollback(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_rollback');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_prepare() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) + self.assertEqual(1, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_rollback() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + def test_tpc_rollback_one_phase(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_rollback() + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + def test_tpc_rollback_recovered(self): + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + + cnn.tpc_begin(xid) + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) + + cur = cnn.cursor() + cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn.tpc_prepare() + cnn.close() + self.assertEqual(1, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + + cnn = self.connect() + xid = cnn.xid(1, "gtrid", "bqual") + cnn.tpc_rollback(xid) + + self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) + self.assertEqual(0, self.count_xacts()) + self.assertEqual(0, self.count_test_records()) + def test_status_after_recover(self): cnn = self.connect() self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) |
