summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--psycopg/green.c23
-rw-r--r--tests/test_green.py16
2 files changed, 39 insertions, 0 deletions
diff --git a/psycopg/green.c b/psycopg/green.c
index fc9605d..df1c02e 100644
--- a/psycopg/green.c
+++ b/psycopg/green.c
@@ -33,6 +33,7 @@
HIDDEN PyObject *wait_callback = NULL;
PyObject *have_wait_callback(void);
+void psyco_clear_result_blocking(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@@ -163,6 +164,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL);
if (!pyrv) {
Dprintf("psyco_exec_green: error in callback sending query");
+ psyco_clear_result_blocking(conn);
goto clear;
}
Py_DECREF(pyrv);
@@ -173,6 +175,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL);
if (!pyrv) {
Dprintf("psyco_exec_green: error in callback reading result");
+ psyco_clear_result_blocking(conn);
goto clear;
}
Py_DECREF(pyrv);
@@ -197,3 +200,23 @@ end:
return result;
}
+
+/* Discard the result of the currenly executed query, blocking.
+ *
+ * This function doesn't honour the wait callback: it can be used in case of
+ * emergency if the callback fails in order to put the connection back into a
+ * consistent state.
+ *
+ * If any command was issued before clearing the result, libpq would fail with
+ * the error "another command is already in progress".
+ */
+void
+psyco_clear_result_blocking(connectionObject *conn)
+{
+ PGresult *res;
+
+ Dprintf("psyco_clear_result_blocking");
+ while (NULL != (res = PQgetResult(conn->pgconn))) {
+ PQclear(res);
+ }
+}
diff --git a/tests/test_green.py b/tests/test_green.py
index fec0d2a..b4e743b 100644
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -52,6 +52,22 @@ class GreenTests(unittest.TestCase):
self.fail("sending a large query didn't trigger block on write.")
+ def test_error_in_callback(self):
+ conn = self.connect()
+ curs = conn.cursor()
+ curs.execute("select 1") # have a BEGIN
+ curs.fetchone()
+
+ # now try to do something that will fail in the callback
+ psycopg2.extensions.set_wait_callback(lambda conn: 1/0)
+ self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
+
+ # check that the connection is left in an usable state
+ psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
+ conn.rollback()
+ curs.execute("select 2")
+ self.assertEqual(2, curs.fetchone()[0])
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)