summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-10-11 13:03:37 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-11-05 09:34:48 +0000
commit56c02b0f94c3922f7d998b8abf8616d1ee8160c9 (patch)
tree71a8041fda28bfed6f15893774d4f244935f6c3c
parent09983db6ed3aca1856e550c8a32c51831cd3c618 (diff)
downloadpsycopg2-56c02b0f94c3922f7d998b8abf8616d1ee8160c9.tar.gz
Added tpc_recover method.
-rw-r--r--psycopg/connection.h1
-rw-r--r--psycopg/connection_int.c33
-rw-r--r--psycopg/connection_type.c18
-rw-r--r--psycopg/xid.h5
-rw-r--r--psycopg/xid_type.c165
-rw-r--r--tests/test_connection.py69
6 files changed, 283 insertions, 8 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 7d2f66c..85e2d26 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -135,6 +135,7 @@ HIDDEN int conn_poll(connectionObject *self);
HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid);
HIDDEN int conn_tpc_command(connectionObject *self,
const char *cmd, XidObject *xid);
+HIDDEN PyObject *conn_tpc_recover(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 6024a2b..d92d53f 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -957,3 +957,36 @@ conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid)
return rv;
}
+/* conn_tpc_recover -- return a list of pending TPC Xid */
+
+PyObject *
+conn_tpc_recover(connectionObject *self)
+{
+ int status;
+ PyObject *xids = NULL;
+ PyObject *rv = NULL;
+ PyObject *tmp;
+
+ /* store the status to restore it. */
+ status = self->status;
+
+ if (!(xids = xid_recover((PyObject *)self))) { goto exit; }
+
+ if (status == CONN_STATUS_READY && self->status == CONN_STATUS_BEGIN) {
+ /* recover began a transaction: let's abort it. */
+ if (!(tmp = PyObject_CallMethod((PyObject *)self, "rollback", NULL))) {
+ goto exit;
+ }
+ Py_DECREF(tmp);
+ }
+
+ /* all fine */
+ rv = xids;
+ xids = NULL;
+
+exit:
+ Py_XDECREF(xids);
+
+ return rv;
+
+}
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 26e8445..48f2780 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -381,6 +381,22 @@ psyco_conn_tpc_rollback(connectionObject *self, PyObject *args)
conn_rollback, "ROLLBACK PREPARED");
}
+#define psyco_conn_tpc_recover_doc \
+"tpc_recover() -- returns a list of pending transaction IDs."
+
+static PyObject *
+psyco_conn_tpc_recover(connectionObject *self, PyObject *args)
+{
+ EXC_IF_CONN_CLOSED(self);
+ EXC_IF_CONN_ASYNC(self, tpc_recover);
+ EXC_IF_TPC_PREPARED(self, tpc_recover);
+
+ if (!PyArg_ParseTuple(args, "")) { return NULL; }
+
+ return conn_tpc_recover(self);
+}
+
+
#ifdef PSYCOPG_EXTENSIONS
/* set_isolation_level method - switch connection isolation level */
@@ -720,6 +736,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS, psyco_conn_tpc_commit_doc},
{"tpc_rollback", (PyCFunction)psyco_conn_tpc_rollback,
METH_VARARGS, psyco_conn_tpc_rollback_doc},
+ {"tpc_recover", (PyCFunction)psyco_conn_tpc_recover,
+ METH_VARARGS, psyco_conn_tpc_recover_doc},
#ifdef PSYCOPG_EXTENSIONS
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc},
diff --git a/psycopg/xid.h b/psycopg/xid.h
index eec4c7e..7944634 100644
--- a/psycopg/xid.h
+++ b/psycopg/xid.h
@@ -31,6 +31,9 @@
#include "psycopg/config.h"
+/* value for the format_id when the xid doesn't follow the XA standard. */
+#define XID_UNPARSED (-2)
+
extern HIDDEN PyTypeObject XidType;
typedef struct {
@@ -51,6 +54,8 @@ typedef struct {
} XidObject;
HIDDEN XidObject *xid_ensure(PyObject *oxid);
+HIDDEN XidObject *xid_from_string(PyObject *s);
HIDDEN char *xid_get_tid(XidObject *self);
+HIDDEN PyObject *xid_recover(PyObject *conn);
#endif /* PSYCOPG_XID_H */
diff --git a/psycopg/xid_type.c b/psycopg/xid_type.c
index 1aa7b06..ad1e091 100644
--- a/psycopg/xid_type.c
+++ b/psycopg/xid_type.c
@@ -331,21 +331,170 @@ XidObject *xid_ensure(PyObject *oxid)
char *
xid_get_tid(XidObject *self)
{
- /* TODO: for the moment just use the string mashed up by James.
- * later will implement the JDBC algorithm. */
- char *buf;
+ char *buf = NULL;
+ long format_id;
Py_ssize_t bufsize = 0;
- if (self->pg_xact_id) {
- bufsize = 1 + strlen(self->pg_xact_id);
- }
+ format_id = PyInt_AsLong(self->format_id);
+ if (-1 == format_id && PyErr_Occurred()) { goto exit; }
- buf = (char *)PyMem_Malloc(bufsize);
- if (buf) {
+ if (XID_UNPARSED == format_id) {
+ bufsize = 1 + PyString_Size(self->gtrid);
+ if (!(buf = (char *)PyMem_Malloc(bufsize))) {
+ PyErr_NoMemory();
+ goto exit;
+ }
+ strncpy(buf, PyString_AsString(self->gtrid), bufsize);
+ }
+ else {
+ /* TODO: for the moment just use the string mashed up by James.
+ * later will implement the JDBC algorithm. */
+ bufsize = 1 + strlen(self->pg_xact_id);
+ if (!(buf = (char *)PyMem_Malloc(bufsize))) {
+ PyErr_NoMemory();
+ goto exit;
+ }
strncpy(buf, self->pg_xact_id, bufsize);
}
+exit:
return buf;
}
+/* Build a Xid from a string representation.
+ *
+ * If the xid is in the format generated by Psycopg, unpack the tuple into
+ * the struct members. Otherwise generate an "unparsed" xid.
+ */
+XidObject *
+xid_from_string(PyObject *str) {
+ /* TODO: currently always generates an unparsed xid. */
+ XidObject *xid = NULL;
+ XidObject *rv = NULL;
+ PyObject *format_id = NULL;
+ PyObject *tmp;
+
+ /* fake args to work around the checks performed by the xid init */
+ if (!(xid = (XidObject *)PyObject_CallFunction((PyObject *)&XidType,
+ "iss", 0, "tmp", "tmp"))) {
+ goto exit;
+ }
+
+ /* set xid.gtrid */
+ tmp = xid->gtrid;
+ Py_INCREF(str);
+ xid->gtrid = str;
+ Py_DECREF(tmp);
+
+ /* set xid.format_id */
+ if (!(format_id = PyInt_FromLong(XID_UNPARSED))) { goto exit; }
+ tmp = xid->format_id;
+ xid->format_id = format_id;
+ format_id = NULL;
+ Py_DECREF(tmp);
+
+ /* set xid.bqual */
+ tmp = xid->bqual;
+ Py_INCREF(Py_None);
+ xid->bqual = Py_None;
+ Py_DECREF(tmp);
+
+ /* return the finished object */
+ rv = xid;
+ xid = NULL;
+
+exit:
+ Py_XDECREF(format_id);
+ Py_XDECREF(xid);
+
+ return rv;
+}
+
+/* conn_tpc_recover -- return a list of pending TPC Xid */
+
+PyObject *
+xid_recover(PyObject *conn)
+{
+ PyObject *rv = NULL;
+ PyObject *curs = NULL;
+ PyObject *xids = NULL;
+ XidObject *xid = NULL;
+ PyObject *recs = NULL;
+ PyObject *rec = NULL;
+ PyObject *item = NULL;
+ PyObject *tmp;
+ Py_ssize_t len, i;
+ /* curs = conn.cursor() */
+ if (!(curs = PyObject_CallMethod(conn, "cursor", NULL))) { goto exit; }
+
+ /* curs.execute(...) */
+ if (!(tmp = PyObject_CallMethod(curs, "execute", "s",
+ "SELECT gid, prepared, owner, database FROM pg_prepared_xacts;")))
+ {
+ goto exit;
+ }
+ Py_DECREF(tmp);
+
+ /* recs = curs.fetchall() */
+ if (!(recs = PyObject_CallMethod(curs, "fetchall", NULL))) { goto exit; }
+
+ /* curs.close() */
+ if (!(tmp = PyObject_CallMethod(curs, "close", NULL))) { goto exit; }
+ Py_DECREF(tmp);
+
+ /* Build the list with return values. */
+ if (0 > (len = PySequence_Size(recs))) { goto exit; }
+ if (!(xids = PyList_New(len))) { goto exit; }
+
+ /* populate the xids list */
+ for (i = 0; i < len; ++i) {
+ if (!(rec = PySequence_GetItem(recs, i))) { goto exit; }
+
+ /* Get the xid with the XA triple set */
+ if (!(item = PySequence_GetItem(rec, 0))) { goto exit; }
+ if (!(xid = xid_from_string(item))) { goto exit; }
+ Py_DECREF(item); item = NULL;
+
+ /* set xid.prepared */
+ if (!(item = PySequence_GetItem(rec, 1))) { goto exit; }
+ tmp = xid->prepared;
+ xid->prepared = item;
+ Py_DECREF(tmp);
+ item = NULL;
+
+ /* set xid.owner */
+ if (!(item = PySequence_GetItem(rec, 2))) { goto exit; }
+ tmp = xid->owner;
+ xid->owner = item;
+ Py_DECREF(tmp);
+ item = NULL;
+
+ /* set xid.database */
+ if (!(item = PySequence_GetItem(rec, 3))) { goto exit; }
+ tmp = xid->database;
+ xid->database = item;
+ Py_DECREF(tmp);
+ item = NULL;
+
+ /* xid finished: add it to the returned list */
+ PyList_SET_ITEM(xids, i, (PyObject *)xid);
+ xid = NULL; /* ref stolen */
+
+ Py_DECREF(rec); rec = NULL;
+ }
+
+ /* set the return value. */
+ rv = xids;
+ xids = NULL;
+
+exit:
+ Py_XDECREF(xids);
+ Py_XDECREF(xid);
+ Py_XDECREF(curs);
+ Py_XDECREF(recs);
+ Py_XDECREF(rec);
+ Py_XDECREF(item);
+
+ return rv;
+}
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 4eff867..6c6dddc 100644
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -1,6 +1,8 @@
#!/usr/bin/env python
import unittest
+from operator import attrgetter
+
import psycopg2
import psycopg2.extensions
import tests
@@ -64,6 +66,73 @@ class ConnectionTests(unittest.TestCase):
conn = self.connect()
self.assert_(conn.encoding in psycopg2.extensions.encodings)
+
+class ConnectionTwoPhaseTests(unittest.TestCase):
+ def setUp(self):
+ self.clear_test_xacts()
+
+ def tearDown(self):
+ self.clear_test_xacts()
+
+ def clear_test_xacts(self):
+ """Rollback all the prepared transaction in the testing db."""
+ cnn = self.connect()
+ cnn.set_isolation_level(0)
+ cur = cnn.cursor()
+ cur.execute(
+ "select gid from pg_prepared_xacts where database = %s",
+ (tests.dbname,))
+ gids = [ r[0] for r in cur ]
+ for gid in gids:
+ cur.execute("rollback prepared %s;", (gid,))
+ cnn.close()
+
+ def connect(self):
+ return psycopg2.connect(tests.dsn)
+
+ def test_status_after_recover(self):
+ cnn = self.connect()
+ self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
+ xns = cnn.tpc_recover()
+ self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
+
+ cur = cnn.cursor()
+ cur.execute("select 1")
+ self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
+ xns = cnn.tpc_recover()
+ self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
+
+ def test_recovered_xids(self):
+ # insert a few test xns
+ cnn = self.connect()
+ cnn.set_isolation_level(0)
+ cur = cnn.cursor()
+ cur.execute("begin; prepare transaction '1-foo';")
+ cur.execute("begin; prepare transaction '2-bar';")
+
+ # read the values to return
+ cur.execute("""
+ select gid, prepared, owner, database
+ from pg_prepared_xacts
+ where database = %s;""",
+ (tests.dbname,))
+ okvals = cur.fetchall()
+ okvals.sort()
+
+ cnn = self.connect()
+ xids = cnn.tpc_recover()
+ xids = [ xid for xid in xids if xid.database == tests.dbname ]
+ xids.sort(key=attrgetter('gtrid'))
+
+ # check the values returned
+ self.assertEqual(len(okvals), len(xids))
+ for (xid, (gid, prepared, owner, database)) in zip (xids, okvals):
+ self.assertEqual(xid.gtrid, gid)
+ self.assertEqual(xid.prepared, prepared)
+ self.assertEqual(xid.owner, owner)
+ self.assertEqual(xid.database, database)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)