diff options
| -rw-r--r-- | psycopg/connection_int.c | 59 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 21 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 1 | 
3 files changed, 58 insertions, 23 deletions
| diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index ebd710f..e1670e4 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -791,6 +791,32 @@ _conn_poll_connecting(connectionObject *self)  } +/* Advance to the next state after a call to a pq_is_busy* function */ +int +_conn_poll_advance_read(connectionObject *self, int busy) +{ +    int res; + +    switch (busy) { +    case 0: /* result is ready */ +        res = PSYCO_POLL_OK; +        Dprintf("conn_poll: async_status -> ASYNC_DONE"); +        self->async_status = ASYNC_DONE; +        break; +    case 1: /* result not ready: fd would block */ +        res = PSYCO_POLL_READ; +        break; +    case -1: /* ouch, error */ +        res = PSYCO_POLL_ERROR; +        break; +    default: +        Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); +        res = PSYCO_POLL_ERROR; +        break; +    } +    return res; +} +  /* Poll the connection for the send query/retrieve result phase    Advance the async_status (usually going WRITE -> READ -> DONE) but don't @@ -822,42 +848,29 @@ _conn_poll_query(connectionObject *self)      case ASYNC_READ:          Dprintf("conn_poll: async_status = ASYNC_READ"); -        if (0 == PQconsumeInput(self->pgconn)) { -            PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); -            res = PSYCO_POLL_ERROR; +        if (self->async) { +            res = _conn_poll_advance_read(self, pq_is_busy(self));          } -        if (PQisBusy(self->pgconn)) { -            res = PSYCO_POLL_READ; -        } else { -            /* Reading complete: set the async status so that a spare poll() -               will only look for NOTIFYs */ -            self->async_status = ASYNC_DONE; -            res = PSYCO_POLL_OK; +        else { +            /* we are a green connection being polled as result of a query. +              this means that our caller has the lock and we are being called +              from the callback. If we tried to acquire the lock now it would +              be a deadlock. */ +            res = _conn_poll_advance_read(self, pq_is_busy_locked(self));          }          break;      case ASYNC_DONE:          Dprintf("conn_poll: async_status = ASYNC_DONE");          /* We haven't asked anything: just check for notifications. */ -        switch (pq_is_busy(self)) { -        case 0: /* will not block */ -            res = PSYCO_POLL_OK; -            break; -        case 1: /* will block */ -            res = PSYCO_POLL_READ; -            break; -        case -1: /* ouch, error */ -            break; -        default: -            Dprintf("conn_poll: unexpected result from pq_is_busy"); -            break; -        } +        res = _conn_poll_advance_read(self, pq_is_busy(self));          break;      default:          Dprintf("conn_poll: in unexpected async status: %d",                  self->async_status);          res = PSYCO_POLL_ERROR; +        break;      }      return res; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 0704701..8821151 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -643,6 +643,27 @@ pq_is_busy(connectionObject *conn)      return res;  } +/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock + * + * The function should be called with the lock and holding the GIL. + */ + +int +pq_is_busy_locked(connectionObject *conn) +{ +    Dprintf("pq_is_busy_locked: consuming input"); + +    if (PQconsumeInput(conn->pgconn) == 0) { +        Dprintf("pq_is_busy_locked: PQconsumeInput() failed"); +        PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); +        return -1; +    } + +    /* We can't call conn_notice_process/conn_notifies_process because +      they try to get the lock. We don't need anyway them because at the end of +      the loop we are in (async reading) pq_fetch will be called. */ +    return PQisBusy(conn->pgconn); +}  /* pq_flush - flush output and return connection status diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index cd57546..f1cfc09 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -46,6 +46,7 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,  HIDDEN int pq_abort(connectionObject *conn);  HIDDEN int pq_reset(connectionObject *conn);  HIDDEN int pq_is_busy(connectionObject *conn); +HIDDEN int pq_is_busy_locked(connectionObject *conn);  HIDDEN int pq_flush(connectionObject *conn);  HIDDEN void pq_clear_async(connectionObject *conn);  HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg, int pyerr); | 
