summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NEWS3
-rw-r--r--psycopg/connection.h1
-rw-r--r--psycopg/connection_int.c24
-rw-r--r--psycopg/green.c33
-rw-r--r--psycopg/python.h5
-rw-r--r--sandbox/test_green_error.py81
-rwxr-xr-xtests/test_green.py19
7 files changed, 139 insertions, 27 deletions
diff --git a/NEWS b/NEWS
index 4478e58..2e5477e 100644
--- a/NEWS
+++ b/NEWS
@@ -16,6 +16,9 @@ What's new in psycopg 2.4.6
- Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110).
+ - Close a green connection if there is an error in the callback.
+ Maybe a harsh solution but it leaves the program responsive
+ (ticket #113).
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
RealDictConnection and Cursor (ticket #114).
- connect() raises an exception instead of swallowing keyword arguments
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 9647ffd..01cc6a4 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -141,6 +141,7 @@ HIDDEN void conn_notifies_process(connectionObject *self);
RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
+HIDDEN void conn_close_locked(connectionObject *self);
RAISES_NEG HIDDEN int conn_commit(connectionObject *self);
RAISES_NEG HIDDEN int conn_rollback(connectionObject *self);
RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel,
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 8c8fed4..a93c233 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -922,12 +922,24 @@ conn_close(connectionObject *self)
return;
}
- /* sets this connection as closed even for other threads; also note that
- we need to check the value of pgconn, because we get called even when
- the connection fails! */
+ /* sets this connection as closed even for other threads; */
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
+ conn_close_locked(self);
+
+ pthread_mutex_unlock(&self->lock);
+ Py_END_ALLOW_THREADS;
+}
+
+/* conn_close_locked - shut down the connection with the lock already taken */
+
+void conn_close_locked(connectionObject *self)
+{
+ if (self->closed) {
+ return;
+ }
+
/* We used to call pq_abort_locked here, but the idea of issuing a
* rollback on close/GC has been considered inappropriate.
*
@@ -937,9 +949,10 @@ conn_close(connectionObject *self)
* transaction though: to avoid these problems the transaction should be
* closed only in status CONN_STATUS_READY.
*/
-
self->closed = 1;
+ /* we need to check the value of pgconn, because we get called even when
+ * the connection fails! */
if (self->pgconn) {
PQfinish(self->pgconn);
self->pgconn = NULL;
@@ -947,9 +960,6 @@ conn_close(connectionObject *self)
PQfreeCancel(self->cancel);
self->cancel = NULL;
}
-
- pthread_mutex_unlock(&self->lock);
- Py_END_ALLOW_THREADS;
}
/* conn_commit - commit on a connection */
diff --git a/psycopg/green.c b/psycopg/green.c
index 65578f5..3ffa810 100644
--- a/psycopg/green.c
+++ b/psycopg/green.c
@@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void);
-static void psyco_clear_result_blocking(connectionObject *conn);
+static void green_panic(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) {
- psyco_clear_result_blocking(conn);
+ green_panic(conn);
goto end;
}
@@ -192,22 +192,21 @@ end:
}
-/* Discard the result of the currenly executed query, blocking.
- *
- * This function doesn't honour the wait callback: it can be used in case of
- * emergency if the callback fails in order to put the connection back into a
- * consistent state.
- *
- * If any command was issued before clearing the result, libpq would fail with
- * the error "another command is already in progress".
+/* There has been a communication error during query execution. It may have
+ * happened e.g. for a network error or an error in the callback, and we
+ * cannot tell the two apart.
+ * Trying to PQcancel or PQgetResult to put the connection back into a working
+ * state doesn't work nice (issue #113): the program blocks and the
+ * interpreter won't even respond to SIGINT. PQreset could work async, but the
+ * python program would have then a connection made but not configured where
+ * it is probably not designed to handled. So for the moment we do the kindest
+ * thing we can: we close the connection. A long-running program should
+ * already have a way to discard broken connections; a short-lived one would
+ * benefit of working ctrl-c.
*/
static void
-psyco_clear_result_blocking(connectionObject *conn)
+green_panic(connectionObject *conn)
{
- PGresult *res;
-
- Dprintf("psyco_clear_result_blocking");
- while (NULL != (res = PQgetResult(conn->pgconn))) {
- PQclear(res);
- }
+ Dprintf("green_panic: closing the connection");
+ conn_close_locked(conn);
}
diff --git a/psycopg/python.h b/psycopg/python.h
index 6d87fa5..f6d6be0 100644
--- a/psycopg/python.h
+++ b/psycopg/python.h
@@ -35,6 +35,11 @@
# error "psycopg requires Python >= 2.4"
#endif
+#if PY_VERSION_HEX < 0x02050000
+/* Function missing in Py 2.4 */
+#define PyErr_WarnEx(cat,msg,lvl) PyErr_Warn(cat,msg)
+#endif
+
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t;
#define PY_SSIZE_T_MIN INT_MIN
diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py
new file mode 100644
index 0000000..7477382
--- /dev/null
+++ b/sandbox/test_green_error.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+"""Test for issue #113: test with error during green processing
+"""
+
+DSN = 'dbname=test'
+
+import eventlet.patcher
+eventlet.patcher.monkey_patch()
+
+import os
+import signal
+from time import sleep
+
+import psycopg2
+from psycopg2 import extensions
+from eventlet.hubs import trampoline
+
+
+# register a test wait callback that fails if SIGHUP is received
+
+panic = []
+
+def wait_cb(conn):
+ """A wait callback useful to allow eventlet to work with Psycopg."""
+ while 1:
+ if panic:
+ raise Exception('whatever')
+
+ state = conn.poll()
+ if state == extensions.POLL_OK:
+ break
+ elif state == extensions.POLL_READ:
+ trampoline(conn.fileno(), read=True)
+ elif state == extensions.POLL_WRITE:
+ trampoline(conn.fileno(), write=True)
+ else:
+ raise psycopg2.OperationalError(
+ "Bad result from poll: %r" % state)
+
+extensions.set_wait_callback(wait_cb)
+
+
+# SIGHUP handler to inject a fail in the callback
+
+def handler(signum, frame):
+ panic.append(True)
+
+signal.signal(signal.SIGHUP, handler)
+
+
+# Simulate another green thread working
+
+def worker():
+ while 1:
+ print "I'm working"
+ sleep(1)
+
+eventlet.spawn(worker)
+
+
+# You can unplug the network cable etc. here.
+# Kill -HUP will raise an exception in the callback.
+
+print "PID", os.getpid()
+conn = psycopg2.connect(DSN)
+curs = conn.cursor()
+try:
+ for i in range(1000):
+ curs.execute("select %s, pg_sleep(1)", (i,))
+ r = curs.fetchone()
+ print "selected", r
+
+except BaseException, e:
+ print "got exception:", e.__class__.__name__, e
+
+if conn.closed:
+ print "the connection is closed"
+else:
+ conn.rollback()
+ curs.execute("select 1")
+ print curs.fetchone()
diff --git a/tests/test_green.py b/tests/test_green.py
index d641d18..e0cd57d 100755
--- a/tests/test_green.py
+++ b/tests/test_green.py
@@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase):
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
+ # behaviour changed after issue #113: if there is an error in the
+ # callback for the moment we don't have a way to reset the connection
+ # without blocking (ticket #113) so just close it.
conn = self.conn
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
@@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase):
psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
+ self.assert_(conn.closed)
+
+ def test_dont_freak_out(self):
+ # if there is an error in a green query, don't freak out and close
+ # the connection
+ conn = self.conn
+ curs = conn.cursor()
+ self.assertRaises(psycopg2.ProgrammingError,
+ curs.execute, "select the unselectable")
+
# check that the connection is left in an usable state
- psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
+ self.assert_(not conn.closed)
conn.rollback()
- curs.execute("select 2")
- self.assertEqual(2, curs.fetchone()[0])
+ curs.execute("select 1")
+ self.assertEqual(curs.fetchone()[0], 1)
def test_suite():