diff options
| -rw-r--r-- | psycopg/connection_int.c | 59 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 21 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 1 |
3 files changed, 58 insertions, 23 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index ebd710f..e1670e4 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -791,6 +791,32 @@ _conn_poll_connecting(connectionObject *self) } +/* Advance to the next state after a call to a pq_is_busy* function */ +int +_conn_poll_advance_read(connectionObject *self, int busy) +{ + int res; + + switch (busy) { + case 0: /* result is ready */ + res = PSYCO_POLL_OK; + Dprintf("conn_poll: async_status -> ASYNC_DONE"); + self->async_status = ASYNC_DONE; + break; + case 1: /* result not ready: fd would block */ + res = PSYCO_POLL_READ; + break; + case -1: /* ouch, error */ + res = PSYCO_POLL_ERROR; + break; + default: + Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); + res = PSYCO_POLL_ERROR; + break; + } + return res; +} + /* Poll the connection for the send query/retrieve result phase Advance the async_status (usually going WRITE -> READ -> DONE) but don't @@ -822,42 +848,29 @@ _conn_poll_query(connectionObject *self) case ASYNC_READ: Dprintf("conn_poll: async_status = ASYNC_READ"); - if (0 == PQconsumeInput(self->pgconn)) { - PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); - res = PSYCO_POLL_ERROR; + if (self->async) { + res = _conn_poll_advance_read(self, pq_is_busy(self)); } - if (PQisBusy(self->pgconn)) { - res = PSYCO_POLL_READ; - } else { - /* Reading complete: set the async status so that a spare poll() - will only look for NOTIFYs */ - self->async_status = ASYNC_DONE; - res = PSYCO_POLL_OK; + else { + /* we are a green connection being polled as result of a query. + this means that our caller has the lock and we are being called + from the callback. If we tried to acquire the lock now it would + be a deadlock. */ + res = _conn_poll_advance_read(self, pq_is_busy_locked(self)); } break; case ASYNC_DONE: Dprintf("conn_poll: async_status = ASYNC_DONE"); /* We haven't asked anything: just check for notifications. */ - switch (pq_is_busy(self)) { - case 0: /* will not block */ - res = PSYCO_POLL_OK; - break; - case 1: /* will block */ - res = PSYCO_POLL_READ; - break; - case -1: /* ouch, error */ - break; - default: - Dprintf("conn_poll: unexpected result from pq_is_busy"); - break; - } + res = _conn_poll_advance_read(self, pq_is_busy(self)); break; default: Dprintf("conn_poll: in unexpected async status: %d", self->async_status); res = PSYCO_POLL_ERROR; + break; } return res; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 0704701..8821151 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -643,6 +643,27 @@ pq_is_busy(connectionObject *conn) return res; } +/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock + * + * The function should be called with the lock and holding the GIL. + */ + +int +pq_is_busy_locked(connectionObject *conn) +{ + Dprintf("pq_is_busy_locked: consuming input"); + + if (PQconsumeInput(conn->pgconn) == 0) { + Dprintf("pq_is_busy_locked: PQconsumeInput() failed"); + PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); + return -1; + } + + /* We can't call conn_notice_process/conn_notifies_process because + they try to get the lock. We don't need anyway them because at the end of + the loop we are in (async reading) pq_fetch will be called. */ + return PQisBusy(conn->pgconn); +} /* pq_flush - flush output and return connection status diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index cd57546..f1cfc09 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -46,6 +46,7 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres, HIDDEN int pq_abort(connectionObject *conn); HIDDEN int pq_reset(connectionObject *conn); HIDDEN int pq_is_busy(connectionObject *conn); +HIDDEN int pq_is_busy_locked(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn); HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg, int pyerr); |
