diff options
author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2011-06-01 09:07:02 +0100 |
---|---|---|
committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2011-06-01 09:07:02 +0100 |
commit | ea03ffbf76c24e7ba36cc6c94ec2b0c748700c20 (patch) | |
tree | fb442a0f4f9ffc2a28bdcebb4c8f5243bdaa67da | |
parent | a69facc7f0abf96493a2d269365f1392c0e8143c (diff) | |
download | psycopg2-ea03ffbf76c24e7ba36cc6c94ec2b0c748700c20.tar.gz |
Added partial implementation for set_transaction
autocommit to be implemented yet.
-rw-r--r-- | psycopg/connection.h | 9 | ||||
-rw-r--r-- | psycopg/connection_int.c | 40 | ||||
-rw-r--r-- | psycopg/connection_type.c | 149 | ||||
-rwxr-xr-x | tests/test_connection.py | 146 |
4 files changed, 336 insertions, 8 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h index 262e6ac..979b37f 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -136,6 +136,8 @@ HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN void conn_close(connectionObject *self); HIDDEN int conn_commit(connectionObject *self); HIDDEN int conn_rollback(connectionObject *self); +HIDDEN int conn_set(connectionObject *self, const char *param, const char *value); +HIDDEN int conn_set_autocommit(connectionObject *self, int value); HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN int conn_poll(connectionObject *self); @@ -154,6 +156,13 @@ HIDDEN PyObject *conn_tpc_recover(connectionObject *self); "in asynchronous mode"); \ return NULL; } +#define EXC_IF_IN_TRANSACTION(self, cmd) \ + if (self->status != CONN_STATUS_READY) { \ + PyErr_Format(ProgrammingError, \ + "%s cannot be used inside a transaction", #cmd); \ + return NULL; \ + } + #define EXC_IF_TPC_NOT_SUPPORTED(self) \ if ((self)->server_version < 80100) { \ PyErr_Format(NotSupportedError, \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 22c5bc5..b0ed41d 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -952,6 +952,46 @@ conn_rollback(connectionObject *self) return res; } +/* conn_set - set a guc parameter */ + +int +conn_set(connectionObject *self, const char *param, const char *value) +{ + char query[256]; + PGresult *pgres = NULL; + char *error = NULL; + int res = 1; + + Dprintf("conn_set: setting %s to %s", param, value); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + + if (0 == strcmp(value, "default")) { + sprintf(query, "SET %s TO DEFAULT;", param); + } + else { + sprintf(query, "SET %s TO '%s';", param, value); + } + + res = pq_execute_command_locked(self, query, &pgres, &error, &_save); + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + + if (res < 0) { + pq_complete_error(self, &pgres, &error); + } + + return res; +} + +int +conn_set_autocommit(connectionObject *self, int value) +{ + return -1; +} + /* conn_switch_isolation_level - switch isolation level on the connection */ int diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 2d02b86..299888a 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -187,6 +187,7 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args) EXC_IF_CONN_CLOSED(self); EXC_IF_CONN_ASYNC(self, tpc_begin); EXC_IF_TPC_NOT_SUPPORTED(self); + EXC_IF_IN_TRANSACTION(self, tpc_begin); if (!PyArg_ParseTuple(args, "O", &oxid)) { goto exit; @@ -196,13 +197,6 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args) goto exit; } - /* check we are not in a transaction */ - if (self->status != CONN_STATUS_READY) { - PyErr_SetString(ProgrammingError, - "tpc_begin must be called outside a transaction"); - goto exit; - } - /* two phase commit and autocommit make no point */ if (self->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) { PyErr_SetString(ProgrammingError, @@ -384,6 +378,145 @@ psyco_conn_tpc_recover(connectionObject *self, PyObject *args) #ifdef PSYCOPG_EXTENSIONS + +/* parse a python object into one of the possible isolation level values */ + +static const char * +_psyco_conn_parse_isolevel(PyObject *pyval) +{ + const char **value = NULL; + + static const char *isolevels[] = { + "", /* autocommit */ + "read uncommitted", + "read committed", + "repeatable read", + "serializable", + "default", + NULL }; + + /* parse from one of the level constants */ + if (PyInt_Check(pyval)) { + long level = PyInt_AsLong(pyval); + if (level == -1 && PyErr_Occurred()) { return NULL; } + if (level < 1 || level > 4) { + PyErr_SetString(PyExc_ValueError, + "isolation_level must be between 1 and 4"); + return NULL; + } + value = isolevels + level; + } + + /* parse from the string -- this includes "default" */ + else { + value = isolevels; + while (*(++value)) { + int cmp; + PyObject *pylevel; + if (!(pylevel = Text_FromUTF8(*value))) { return NULL; } + cmp = PyObject_RichCompareBool(pylevel, pyval, Py_EQ); + Py_DECREF(pylevel); + if (-1 == cmp) { return NULL; } + if (cmp) { break; } + } + if (!*value) { + PyErr_SetString(PyExc_ValueError, + "bad value for isolation_level"); /* TODO: emit value */ + } + } + return *value; +} + +/* convert True/False/"default" into a C string */ + +static const char * +_psyco_conn_parse_onoff(PyObject *pyval) +{ + int istrue = PyObject_IsTrue(pyval); + if (-1 == istrue) { return NULL; } + if (istrue) { + int cmp; + PyObject *pydef; + if (!(pydef = Text_FromUTF8("default"))) { return NULL; } + cmp = PyObject_RichCompareBool(pyval, pydef, Py_EQ); + Py_DECREF(pydef); + if (-1 == cmp) { return NULL; } + return cmp ? "default" : "on"; + } + else { + return "off"; + } +} + +/* set_transaction - default transaction characteristics */ + +#define psyco_conn_set_transaction_doc \ +"set_transaction(...) -- Set one or more parameters for the next transactions.\n\n" \ +"Accepted arguments are 'isolation_level', 'readonly', 'deferrable', 'autocommit'." + +static PyObject * +psyco_conn_set_transaction(connectionObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *isolation_level = Py_None; + PyObject *readonly = Py_None; + PyObject *deferrable = Py_None; + PyObject *autocommit = Py_None; + + static char *kwlist[] = + {"isolation_level", "readonly", "deferrable", "autocommit", NULL}; + + EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, set_transaction); + EXC_IF_IN_TRANSACTION(self, set_transaction); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kwlist, + &isolation_level, &readonly, &deferrable, &autocommit)) { + return NULL; + } + + if (Py_None != isolation_level) { + const char *value = NULL; + if (!(value = _psyco_conn_parse_isolevel(isolation_level))) { + return NULL; + } + if (0 != conn_set(self, "default_transaction_isolation", value)) { + return NULL; + } + } + + if (Py_None != readonly) { + const char *value = NULL; + if (!(value = _psyco_conn_parse_onoff(readonly))) { + return NULL; + } + if (0 != conn_set(self, "default_transaction_read_only", value)) { + return NULL; + } + } + + if (Py_None != deferrable) { + const char *value = NULL; + if (!(value = _psyco_conn_parse_onoff(deferrable))) { + return NULL; + } + if (0 != conn_set(self, "default_transaction_deferrable", value)) { + return NULL; + } + } + + if (Py_None != autocommit) { + int value = PyObject_IsTrue(autocommit); + if (-1 == value) { return NULL; } + if (0 != conn_set_autocommit(self, value)) { + return NULL; + } + } + + Py_INCREF(Py_None); + return Py_None; +} + + /* set_isolation_level method - switch connection isolation level */ #define psyco_conn_set_isolation_level_doc \ @@ -717,6 +850,8 @@ static struct PyMethodDef connectionObject_methods[] = { {"tpc_recover", (PyCFunction)psyco_conn_tpc_recover, METH_NOARGS, psyco_conn_tpc_recover_doc}, #ifdef PSYCOPG_EXTENSIONS + {"set_transaction", (PyCFunction)psyco_conn_set_transaction, + METH_VARARGS|METH_KEYWORDS, psyco_conn_set_transaction_doc}, {"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level, METH_VARARGS, psyco_conn_set_isolation_level_doc}, {"set_client_encoding", (PyCFunction)psyco_conn_set_client_encoding, diff --git a/tests/test_connection.py b/tests/test_connection.py index 89b104f..18c9277 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -25,7 +25,8 @@ import os import time import threading -from testutils import unittest, decorate_all_tests, skip_before_postgres +from testutils import unittest, decorate_all_tests +from testutils import skip_before_postgres, skip_after_postgres from operator import attrgetter import psycopg2 @@ -707,6 +708,149 @@ from testutils import skip_if_tpc_disabled decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled) +class TransactionControlTests(unittest.TestCase): + def setUp(self): + self.conn = psycopg2.connect(dsn) + + def tearDown(self): + if not self.conn.closed: + self.conn.close() + + def test_not_in_transaction(self): + cur = self.conn.cursor() + cur.execute("select 1") + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_transaction, + psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + + def test_set_isolation_level(self): + cur = self.conn.cursor() + self.conn.set_transaction( + psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction( + psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction( + isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.set_transaction( + isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED) + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + def test_set_isolation_level_str(self): + cur = self.conn.cursor() + self.conn.set_transaction("serializable") + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction("repeatable read") + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'repeatable read') + else: + self.assertEqual(cur.fetchone()[0], 'serializable') + self.conn.rollback() + + self.conn.set_transaction("read committed") + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + self.conn.set_transaction("read uncommitted") + cur.execute("SHOW default_transaction_isolation;") + if self.conn.server_version > 80000: + self.assertEqual(cur.fetchone()[0], 'read uncommitted') + else: + self.assertEqual(cur.fetchone()[0], 'read committed') + self.conn.rollback() + + def test_bad_isolation_level(self): + self.assertRaises(ValueError, self.conn.set_transaction, 0) + self.assertRaises(ValueError, self.conn.set_transaction, 5) + self.assertRaises(ValueError, self.conn.set_transaction, 'whatever') + + def test_set_read_only(self): + cur = self.conn.cursor() + self.conn.set_transaction(readonly=True) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + cur = self.conn.cursor() + self.conn.set_transaction(readonly=None) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + self.conn.set_transaction(readonly=False) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + def test_set_default(self): + cur = self.conn.cursor() + cur.execute("SHOW default_transaction_isolation;") + default_isolevel = cur.fetchone()[0] + cur.execute("SHOW default_transaction_read_only;") + default_readonly = cur.fetchone()[0] + self.conn.rollback() + + self.conn.set_transaction(isolation_level='serializable', readonly=True) + self.conn.set_transaction(isolation_level='default', readonly='default') + + cur.execute("SHOW default_transaction_isolation;") + self.assertEqual(cur.fetchone()[0], default_isolevel) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], default_readonly) + + @skip_before_postgres(9, 1) + def test_set_deferrable(self): + cur = self.conn.cursor() + self.conn.set_transaction(readonly=True, deferrable=True) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'on') + self.conn.rollback() + + self.conn.set_transaction(deferrable=False) + cur.execute("SHOW default_transaction_read_only;") + self.assertEqual(cur.fetchone()[0], 'on') + cur.execute("SHOW default_transaction_deferrable;") + self.assertEqual(cur.fetchone()[0], 'off') + self.conn.rollback() + + @skip_after_postgres(9, 1) + def test_set_deferrable_error(self): + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_transaction, readonly=True, deferrable=True) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |