diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-10-05 03:13:44 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-10-05 03:13:44 +0100 |
| commit | bc2aefeacface7f8636a8609b4f09ba40eb4f7b1 (patch) | |
| tree | 989e103557453e68a40f6cb586e18d85f3e4fec2 | |
| parent | 5b4d366f4e85558f11384df064cf6f69c9ef4030 (diff) | |
| download | psycopg2-bc2aefeacface7f8636a8609b4f09ba40eb4f7b1.tar.gz | |
cursor.mogrify() accepts unicode queries.
| -rw-r--r-- | ChangeLog | 2 | ||||
| -rw-r--r-- | psycopg/cursor_type.c | 68 | ||||
| -rw-r--r-- | tests/test_cursor.py | 31 |
3 files changed, 71 insertions, 30 deletions
@@ -2,6 +2,8 @@ * psycopg/cursor_type.c: Common code in execute() and mogrify() merged. + * psycopg/cursor_type.c: cursor.mogrify() accepts unicode queries. + 2010-09-23 Daniele Varrazzo <daniele.varrazzo@gmail.com> * lib/errorcodes.py: Added PostgreSQL 9.0 error codes. diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index ca9813f..8bc7c40 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -568,56 +568,64 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) "mogrify(query, vars=None) -> str -- Return query after vars binding." static PyObject * -psyco_curs_mogrify(cursorObject *self, PyObject *args, PyObject *kwargs) +_psyco_curs_mogrify(cursorObject *self, + PyObject *operation, PyObject *vars) { - PyObject *vars = NULL, *cvt = NULL, *operation = NULL; - PyObject *fquery; - - static char *kwlist[] = {"query", "vars", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist, - &operation, &vars)) { - return NULL; - } - - if (PyUnicode_Check(operation)) { - PyErr_SetString(NotSupportedError, - "unicode queries not yet supported"); - return NULL; - } + PyObject *fquery = NULL, *cvt = NULL; - EXC_IF_CURS_CLOSED(self); - IFCLEARPGRES(self->pgres); + operation = _psyco_curs_validate_sql_basic(self, operation); + if (operation == NULL) { goto cleanup; } - /* note that we don't overwrite the last query executed on the cursor, we - just *return* the new query with bound variables + Dprintf("psyco_curs_mogrify: starting mogrify"); - TODO: refactor the common mogrification code (see psycopg_curs_execute - for comments, the code is amost identical) */ + /* here we are, and we have a sequence or a dictionary filled with + objects to be substituted (bound variables). we try to be smart and do + the right thing (i.e., what the user expects) */ - if (vars) + if (vars && vars != Py_None) { - if(_mogrify(vars, operation, self->conn, &cvt) == -1) return NULL; + if (_mogrify(vars, operation, self->conn, &cvt) == -1) { + goto cleanup; + } } if (vars && cvt) { if (!(fquery = _psyco_curs_merge_query_args(self, operation, cvt))) { - return NULL; + goto cleanup; } - Dprintf("psyco_curs_execute: cvt->refcnt = " FORMAT_CODE_PY_SSIZE_T + Dprintf("psyco_curs_mogrify: cvt->refcnt = " FORMAT_CODE_PY_SSIZE_T ", fquery->refcnt = " FORMAT_CODE_PY_SSIZE_T, - cvt->ob_refcnt, fquery->ob_refcnt - ); - Py_DECREF(cvt); + cvt->ob_refcnt, fquery->ob_refcnt); } else { fquery = operation; - Py_INCREF(operation); + Py_INCREF(fquery); } +cleanup: + Py_XDECREF(operation); + Py_XDECREF(cvt); + return fquery; } + +static PyObject * +psyco_curs_mogrify(cursorObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *vars = NULL, *operation = NULL; + + static char *kwlist[] = {"query", "vars", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist, + &operation, &vars)) { + return NULL; + } + + EXC_IF_CURS_CLOSED(self); + + return _psyco_curs_mogrify(self, operation, vars); +} #endif diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 38c368a..a2c3b24 100644 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -21,6 +21,37 @@ class CursorTests(unittest.TestCase): cur.close() conn.close() + def test_mogrify_unicode(self): + conn = self.connect() + cur = conn.cursor() + + # test consistency between execute and mogrify. + + # unicode query containing only ascii data + cur.execute(u"SELECT 'foo';") + self.assertEqual('foo', cur.fetchone()[0]) + self.assertEqual("SELECT 'foo';", cur.mogrify(u"SELECT 'foo';")) + + conn.set_client_encoding('UTF8') + snowman = u"\u2603" + + # unicode query with non-ascii data + cur.execute(u"SELECT '%s';" % snowman) + self.assertEqual(snowman.encode('utf8'), cur.fetchone()[0]) + self.assertEqual("SELECT '%s';" % snowman.encode('utf8'), + cur.mogrify(u"SELECT '%s';" % snowman).replace("E'", "'")) + + # unicode args + cur.execute("SELECT %s;", (snowman,)) + self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0]) + self.assertEqual("SELECT '%s';" % snowman.encode('utf8'), + cur.mogrify("SELECT %s;", (snowman,)).replace("E'", "'")) + + # unicode query and args + cur.execute(u"SELECT %s;", (snowman,)) + self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0]) + self.assertEqual("SELECT '%s';" % snowman.encode('utf8'), + cur.mogrify(u"SELECT %s;", (snowman,)).replace("E'", "'")) def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
