diff options
| -rw-r--r-- | psycopg/green.c | 23 | ||||
| -rw-r--r-- | tests/test_green.py | 16 |
2 files changed, 39 insertions, 0 deletions
diff --git a/psycopg/green.c b/psycopg/green.c index fc9605d..df1c02e 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -33,6 +33,7 @@ HIDDEN PyObject *wait_callback = NULL; PyObject *have_wait_callback(void); +void psyco_clear_result_blocking(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -163,6 +164,7 @@ psyco_exec_green(connectionObject *conn, const char *command) pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL); if (!pyrv) { Dprintf("psyco_exec_green: error in callback sending query"); + psyco_clear_result_blocking(conn); goto clear; } Py_DECREF(pyrv); @@ -173,6 +175,7 @@ psyco_exec_green(connectionObject *conn, const char *command) pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL); if (!pyrv) { Dprintf("psyco_exec_green: error in callback reading result"); + psyco_clear_result_blocking(conn); goto clear; } Py_DECREF(pyrv); @@ -197,3 +200,23 @@ end: return result; } + +/* Discard the result of the currenly executed query, blocking. + * + * This function doesn't honour the wait callback: it can be used in case of + * emergency if the callback fails in order to put the connection back into a + * consistent state. + * + * If any command was issued before clearing the result, libpq would fail with + * the error "another command is already in progress". + */ +void +psyco_clear_result_blocking(connectionObject *conn) +{ + PGresult *res; + + Dprintf("psyco_clear_result_blocking"); + while (NULL != (res = PQgetResult(conn->pgconn))) { + PQclear(res); + } +} diff --git a/tests/test_green.py b/tests/test_green.py index fec0d2a..b4e743b 100644 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -52,6 +52,22 @@ class GreenTests(unittest.TestCase): self.fail("sending a large query didn't trigger block on write.") + def test_error_in_callback(self): + conn = self.connect() + curs = conn.cursor() + curs.execute("select 1") # have a BEGIN + curs.fetchone() + + # now try to do something that will fail in the callback + psycopg2.extensions.set_wait_callback(lambda conn: 1/0) + self.assertRaises(ZeroDivisionError, curs.execute, "select 2") + + # check that the connection is left in an usable state + psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) + conn.rollback() + curs.execute("select 2") + self.assertEqual(2, curs.fetchone()[0]) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
