summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2011-06-01 09:07:02 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2011-06-01 09:07:02 +0100
commitea03ffbf76c24e7ba36cc6c94ec2b0c748700c20 (patch)
treefb442a0f4f9ffc2a28bdcebb4c8f5243bdaa67da
parenta69facc7f0abf96493a2d269365f1392c0e8143c (diff)
downloadpsycopg2-ea03ffbf76c24e7ba36cc6c94ec2b0c748700c20.tar.gz
Added partial implementation for set_transaction
autocommit to be implemented yet.
-rw-r--r--psycopg/connection.h9
-rw-r--r--psycopg/connection_int.c40
-rw-r--r--psycopg/connection_type.c149
-rwxr-xr-xtests/test_connection.py146
4 files changed, 336 insertions, 8 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 262e6ac..979b37f 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -136,6 +136,8 @@ HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
HIDDEN int conn_commit(connectionObject *self);
HIDDEN int conn_rollback(connectionObject *self);
+HIDDEN int conn_set(connectionObject *self, const char *param, const char *value);
+HIDDEN int conn_set_autocommit(connectionObject *self, int value);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN int conn_poll(connectionObject *self);
@@ -154,6 +156,13 @@ HIDDEN PyObject *conn_tpc_recover(connectionObject *self);
"in asynchronous mode"); \
return NULL; }
+#define EXC_IF_IN_TRANSACTION(self, cmd) \
+ if (self->status != CONN_STATUS_READY) { \
+ PyErr_Format(ProgrammingError, \
+ "%s cannot be used inside a transaction", #cmd); \
+ return NULL; \
+ }
+
#define EXC_IF_TPC_NOT_SUPPORTED(self) \
if ((self)->server_version < 80100) { \
PyErr_Format(NotSupportedError, \
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 22c5bc5..b0ed41d 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -952,6 +952,46 @@ conn_rollback(connectionObject *self)
return res;
}
+/* conn_set - set a guc parameter */
+
+int
+conn_set(connectionObject *self, const char *param, const char *value)
+{
+ char query[256];
+ PGresult *pgres = NULL;
+ char *error = NULL;
+ int res = 1;
+
+ Dprintf("conn_set: setting %s to %s", param, value);
+
+ Py_BEGIN_ALLOW_THREADS;
+ pthread_mutex_lock(&self->lock);
+
+ if (0 == strcmp(value, "default")) {
+ sprintf(query, "SET %s TO DEFAULT;", param);
+ }
+ else {
+ sprintf(query, "SET %s TO '%s';", param, value);
+ }
+
+ res = pq_execute_command_locked(self, query, &pgres, &error, &_save);
+
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+
+ if (res < 0) {
+ pq_complete_error(self, &pgres, &error);
+ }
+
+ return res;
+}
+
+int
+conn_set_autocommit(connectionObject *self, int value)
+{
+ return -1;
+}
+
/* conn_switch_isolation_level - switch isolation level on the connection */
int
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 2d02b86..299888a 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -187,6 +187,7 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, tpc_begin);
EXC_IF_TPC_NOT_SUPPORTED(self);
+ EXC_IF_IN_TRANSACTION(self, tpc_begin);
if (!PyArg_ParseTuple(args, "O", &oxid)) {
goto exit;
@@ -196,13 +197,6 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
goto exit;
}
- /* check we are not in a transaction */
- if (self->status != CONN_STATUS_READY) {
- PyErr_SetString(ProgrammingError,
- "tpc_begin must be called outside a transaction");
- goto exit;
- }
-
/* two phase commit and autocommit make no point */
if (self->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
PyErr_SetString(ProgrammingError,
@@ -384,6 +378,145 @@ psyco_conn_tpc_recover(connectionObject *self, PyObject *args)
#ifdef PSYCOPG_EXTENSIONS
+
+/* parse a python object into one of the possible isolation level values */
+
+static const char *
+_psyco_conn_parse_isolevel(PyObject *pyval)
+{
+ const char **value = NULL;
+
+ static const char *isolevels[] = {
+ "", /* autocommit */
+ "read uncommitted",
+ "read committed",
+ "repeatable read",
+ "serializable",
+ "default",
+ NULL };
+
+ /* parse from one of the level constants */
+ if (PyInt_Check(pyval)) {
+ long level = PyInt_AsLong(pyval);
+ if (level == -1 && PyErr_Occurred()) { return NULL; }
+ if (level < 1 || level > 4) {
+ PyErr_SetString(PyExc_ValueError,
+ "isolation_level must be between 1 and 4");
+ return NULL;
+ }
+ value = isolevels + level;
+ }
+
+ /* parse from the string -- this includes "default" */
+ else {
+ value = isolevels;
+ while (*(++value)) {
+ int cmp;
+ PyObject *pylevel;
+ if (!(pylevel = Text_FromUTF8(*value))) { return NULL; }
+ cmp = PyObject_RichCompareBool(pylevel, pyval, Py_EQ);
+ Py_DECREF(pylevel);
+ if (-1 == cmp) { return NULL; }
+ if (cmp) { break; }
+ }
+ if (!*value) {
+ PyErr_SetString(PyExc_ValueError,
+ "bad value for isolation_level"); /* TODO: emit value */
+ }
+ }
+ return *value;
+}
+
+/* convert True/False/"default" into a C string */
+
+static const char *
+_psyco_conn_parse_onoff(PyObject *pyval)
+{
+ int istrue = PyObject_IsTrue(pyval);
+ if (-1 == istrue) { return NULL; }
+ if (istrue) {
+ int cmp;
+ PyObject *pydef;
+ if (!(pydef = Text_FromUTF8("default"))) { return NULL; }
+ cmp = PyObject_RichCompareBool(pyval, pydef, Py_EQ);
+ Py_DECREF(pydef);
+ if (-1 == cmp) { return NULL; }
+ return cmp ? "default" : "on";
+ }
+ else {
+ return "off";
+ }
+}
+
+/* set_transaction - default transaction characteristics */
+
+#define psyco_conn_set_transaction_doc \
+"set_transaction(...) -- Set one or more parameters for the next transactions.\n\n" \
+"Accepted arguments are 'isolation_level', 'readonly', 'deferrable', 'autocommit'."
+
+static PyObject *
+psyco_conn_set_transaction(connectionObject *self, PyObject *args, PyObject *kwargs)
+{
+ PyObject *isolation_level = Py_None;
+ PyObject *readonly = Py_None;
+ PyObject *deferrable = Py_None;
+ PyObject *autocommit = Py_None;
+
+ static char *kwlist[] =
+ {"isolation_level", "readonly", "deferrable", "autocommit", NULL};
+
+ EXC_IF_CONN_CLOSED(self);
+ EXC_IF_CONN_ASYNC(self, set_transaction);
+ EXC_IF_IN_TRANSACTION(self, set_transaction);
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kwlist,
+ &isolation_level, &readonly, &deferrable, &autocommit)) {
+ return NULL;
+ }
+
+ if (Py_None != isolation_level) {
+ const char *value = NULL;
+ if (!(value = _psyco_conn_parse_isolevel(isolation_level))) {
+ return NULL;
+ }
+ if (0 != conn_set(self, "default_transaction_isolation", value)) {
+ return NULL;
+ }
+ }
+
+ if (Py_None != readonly) {
+ const char *value = NULL;
+ if (!(value = _psyco_conn_parse_onoff(readonly))) {
+ return NULL;
+ }
+ if (0 != conn_set(self, "default_transaction_read_only", value)) {
+ return NULL;
+ }
+ }
+
+ if (Py_None != deferrable) {
+ const char *value = NULL;
+ if (!(value = _psyco_conn_parse_onoff(deferrable))) {
+ return NULL;
+ }
+ if (0 != conn_set(self, "default_transaction_deferrable", value)) {
+ return NULL;
+ }
+ }
+
+ if (Py_None != autocommit) {
+ int value = PyObject_IsTrue(autocommit);
+ if (-1 == value) { return NULL; }
+ if (0 != conn_set_autocommit(self, value)) {
+ return NULL;
+ }
+ }
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
/* set_isolation_level method - switch connection isolation level */
#define psyco_conn_set_isolation_level_doc \
@@ -717,6 +850,8 @@ static struct PyMethodDef connectionObject_methods[] = {
{"tpc_recover", (PyCFunction)psyco_conn_tpc_recover,
METH_NOARGS, psyco_conn_tpc_recover_doc},
#ifdef PSYCOPG_EXTENSIONS
+ {"set_transaction", (PyCFunction)psyco_conn_set_transaction,
+ METH_VARARGS|METH_KEYWORDS, psyco_conn_set_transaction_doc},
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc},
{"set_client_encoding", (PyCFunction)psyco_conn_set_client_encoding,
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 89b104f..18c9277 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -25,7 +25,8 @@
import os
import time
import threading
-from testutils import unittest, decorate_all_tests, skip_before_postgres
+from testutils import unittest, decorate_all_tests
+from testutils import skip_before_postgres, skip_after_postgres
from operator import attrgetter
import psycopg2
@@ -707,6 +708,149 @@ from testutils import skip_if_tpc_disabled
decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)
+class TransactionControlTests(unittest.TestCase):
+ def setUp(self):
+ self.conn = psycopg2.connect(dsn)
+
+ def tearDown(self):
+ if not self.conn.closed:
+ self.conn.close()
+
+ def test_not_in_transaction(self):
+ cur = self.conn.cursor()
+ cur.execute("select 1")
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_transaction,
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+
+ def test_set_isolation_level(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.set_transaction(
+ isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED)
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ def test_set_isolation_level_str(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction("serializable")
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction("repeatable read")
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'repeatable read')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'serializable')
+ self.conn.rollback()
+
+ self.conn.set_transaction("read committed")
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ self.conn.set_transaction("read uncommitted")
+ cur.execute("SHOW default_transaction_isolation;")
+ if self.conn.server_version > 80000:
+ self.assertEqual(cur.fetchone()[0], 'read uncommitted')
+ else:
+ self.assertEqual(cur.fetchone()[0], 'read committed')
+ self.conn.rollback()
+
+ def test_bad_isolation_level(self):
+ self.assertRaises(ValueError, self.conn.set_transaction, 0)
+ self.assertRaises(ValueError, self.conn.set_transaction, 5)
+ self.assertRaises(ValueError, self.conn.set_transaction, 'whatever')
+
+ def test_set_read_only(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=True)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=None)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ self.conn.set_transaction(readonly=False)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ def test_set_default(self):
+ cur = self.conn.cursor()
+ cur.execute("SHOW default_transaction_isolation;")
+ default_isolevel = cur.fetchone()[0]
+ cur.execute("SHOW default_transaction_read_only;")
+ default_readonly = cur.fetchone()[0]
+ self.conn.rollback()
+
+ self.conn.set_transaction(isolation_level='serializable', readonly=True)
+ self.conn.set_transaction(isolation_level='default', readonly='default')
+
+ cur.execute("SHOW default_transaction_isolation;")
+ self.assertEqual(cur.fetchone()[0], default_isolevel)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], default_readonly)
+
+ @skip_before_postgres(9, 1)
+ def test_set_deferrable(self):
+ cur = self.conn.cursor()
+ self.conn.set_transaction(readonly=True, deferrable=True)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ self.conn.rollback()
+
+ self.conn.set_transaction(deferrable=False)
+ cur.execute("SHOW default_transaction_read_only;")
+ self.assertEqual(cur.fetchone()[0], 'on')
+ cur.execute("SHOW default_transaction_deferrable;")
+ self.assertEqual(cur.fetchone()[0], 'off')
+ self.conn.rollback()
+
+ @skip_after_postgres(9, 1)
+ def test_set_deferrable_error(self):
+ self.assertRaises(psycopg2.ProgrammingError,
+ self.conn.set_transaction, readonly=True, deferrable=True)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)