summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--psycopg/connection_int.c59
-rw-r--r--psycopg/pqpath.c21
-rw-r--r--psycopg/pqpath.h1
3 files changed, 58 insertions, 23 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index ebd710f..e1670e4 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -791,6 +791,32 @@ _conn_poll_connecting(connectionObject *self)
}
+/* Advance to the next state after a call to a pq_is_busy* function */
+int
+_conn_poll_advance_read(connectionObject *self, int busy)
+{
+ int res;
+
+ switch (busy) {
+ case 0: /* result is ready */
+ res = PSYCO_POLL_OK;
+ Dprintf("conn_poll: async_status -> ASYNC_DONE");
+ self->async_status = ASYNC_DONE;
+ break;
+ case 1: /* result not ready: fd would block */
+ res = PSYCO_POLL_READ;
+ break;
+ case -1: /* ouch, error */
+ res = PSYCO_POLL_ERROR;
+ break;
+ default:
+ Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy);
+ res = PSYCO_POLL_ERROR;
+ break;
+ }
+ return res;
+}
+
/* Poll the connection for the send query/retrieve result phase
Advance the async_status (usually going WRITE -> READ -> DONE) but don't
@@ -822,42 +848,29 @@ _conn_poll_query(connectionObject *self)
case ASYNC_READ:
Dprintf("conn_poll: async_status = ASYNC_READ");
- if (0 == PQconsumeInput(self->pgconn)) {
- PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
- res = PSYCO_POLL_ERROR;
+ if (self->async) {
+ res = _conn_poll_advance_read(self, pq_is_busy(self));
}
- if (PQisBusy(self->pgconn)) {
- res = PSYCO_POLL_READ;
- } else {
- /* Reading complete: set the async status so that a spare poll()
- will only look for NOTIFYs */
- self->async_status = ASYNC_DONE;
- res = PSYCO_POLL_OK;
+ else {
+ /* we are a green connection being polled as result of a query.
+ this means that our caller has the lock and we are being called
+ from the callback. If we tried to acquire the lock now it would
+ be a deadlock. */
+ res = _conn_poll_advance_read(self, pq_is_busy_locked(self));
}
break;
case ASYNC_DONE:
Dprintf("conn_poll: async_status = ASYNC_DONE");
/* We haven't asked anything: just check for notifications. */
- switch (pq_is_busy(self)) {
- case 0: /* will not block */
- res = PSYCO_POLL_OK;
- break;
- case 1: /* will block */
- res = PSYCO_POLL_READ;
- break;
- case -1: /* ouch, error */
- break;
- default:
- Dprintf("conn_poll: unexpected result from pq_is_busy");
- break;
- }
+ res = _conn_poll_advance_read(self, pq_is_busy(self));
break;
default:
Dprintf("conn_poll: in unexpected async status: %d",
self->async_status);
res = PSYCO_POLL_ERROR;
+ break;
}
return res;
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 0704701..8821151 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -643,6 +643,27 @@ pq_is_busy(connectionObject *conn)
return res;
}
+/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock
+ *
+ * The function should be called with the lock and holding the GIL.
+ */
+
+int
+pq_is_busy_locked(connectionObject *conn)
+{
+ Dprintf("pq_is_busy_locked: consuming input");
+
+ if (PQconsumeInput(conn->pgconn) == 0) {
+ Dprintf("pq_is_busy_locked: PQconsumeInput() failed");
+ PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
+ return -1;
+ }
+
+ /* We can't call conn_notice_process/conn_notifies_process because
+ they try to get the lock. We don't need anyway them because at the end of
+ the loop we are in (async reading) pq_fetch will be called. */
+ return PQisBusy(conn->pgconn);
+}
/* pq_flush - flush output and return connection status
diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h
index cd57546..f1cfc09 100644
--- a/psycopg/pqpath.h
+++ b/psycopg/pqpath.h
@@ -46,6 +46,7 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,
HIDDEN int pq_abort(connectionObject *conn);
HIDDEN int pq_reset(connectionObject *conn);
HIDDEN int pq_is_busy(connectionObject *conn);
+HIDDEN int pq_is_busy_locked(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn);
HIDDEN void pq_clear_async(connectionObject *conn);
HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg, int pyerr);