diff options
| -rw-r--r-- | psycopg/connection.h | 17 | ||||
| -rw-r--r-- | psycopg/connection_int.c | 503 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 124 | ||||
| -rw-r--r-- | psycopg/green.c | 5 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 21 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 1 | 
6 files changed, 185 insertions, 486 deletions
| diff --git a/psycopg/connection.h b/psycopg/connection.h index 8bf64fe..69f1385 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -45,16 +45,6 @@ extern "C" {  #define CONN_STATUS_DATESTYLE             21  #define CONN_STATUS_CLIENT_ENCODING       22 -/* TODO: REMOVE THOSE */ -#define CONN_STATUS_SYNC  3 -#define CONN_STATUS_ASYNC 4 -#define CONN_STATUS_SEND_DATESTYLE             5 -#define CONN_STATUS_SENT_DATESTYLE             6 -#define CONN_STATUS_GET_DATESTYLE              7 -#define CONN_STATUS_SEND_CLIENT_ENCODING       8 -#define CONN_STATUS_SENT_CLIENT_ENCODING       9 -#define CONN_STATUS_GET_CLIENT_ENCODING        10 -  /* async query execution status */  #define ASYNC_DONE  0  #define ASYNC_READ  1 @@ -137,12 +127,7 @@ HIDDEN int  conn_commit(connectionObject *self);  HIDDEN int  conn_rollback(connectionObject *self);  HIDDEN int  conn_switch_isolation_level(connectionObject *self, int level);  HIDDEN int  conn_set_client_encoding(connectionObject *self, const char *enc); -HIDDEN PyObject *conn_poll_connect_send(connectionObject *self); -HIDDEN PyObject *conn_poll_connect_fetch(connectionObject *self); -HIDDEN PyObject *conn_poll_ready(connectionObject *self); -HIDDEN PyObject *conn_poll_send(connectionObject *self); -HIDDEN PyObject *conn_poll_fetch(connectionObject *self); -HIDDEN PyObject *conn_poll_green(connectionObject *self); +HIDDEN int  conn_poll(connectionObject *self);  /* exception-raising macros */  #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index f398238..b878749 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -208,7 +208,6 @@ conn_get_encoding(PGresult *pgres)      for (i=0 ; i < strlen(tmp) ; i++)          encoding[i] = toupper(tmp[i]);      encoding[i] = '\0'; -    CLEARPGRES(pgres);      return encoding;  } @@ -258,10 +257,18 @@ conn_setup(connectionObject *self, PGconn *pgconn)  {      PGresult *pgres; +    self->equote = conn_get_standard_conforming_strings(pgconn); +    self->server_version = conn_get_server_version(pgconn); +    self->protocol = conn_get_protocol_version(self->pgconn); +      Py_BEGIN_ALLOW_THREADS;      pthread_mutex_lock(&self->lock);      Py_BLOCK_THREADS; +    if (pq_set_non_blocking(self, 1, 1) != 0) { +        return -1; +    } +      if (!psyco_green()) {          Py_UNBLOCK_THREADS;          pgres = PQexec(pgconn, psyco_datestyle); @@ -305,6 +312,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)          Py_BLOCK_THREADS;          return -1;      } +    CLEARPGRES(pgres);      if (!psyco_green()) {          Py_UNBLOCK_THREADS; @@ -334,8 +342,8 @@ conn_setup(connectionObject *self, PGconn *pgconn)  /* conn_connect - execute a connection to the database */ -int -conn_sync_connect(connectionObject *self) +static int +_conn_sync_connect(connectionObject *self)  {      PGconn *pgconn;      PyObject *wait_rv; @@ -382,19 +390,11 @@ conn_sync_connect(connectionObject *self)          }      } -    self->equote = conn_get_standard_conforming_strings(pgconn); -    self->server_version = conn_get_server_version(pgconn); -    self->protocol = conn_get_protocol_version(self->pgconn); -      /* From here the connection is considered ready: with the new status,       * poll() will use PQisBusy instead of PQconnectPoll.       */      self->status = CONN_STATUS_READY; -    if (pq_set_non_blocking(self, 1, 1) != 0) { -        return -1; -    } -      if (conn_setup(self, self->pgconn) == -1) {          return -1;      } @@ -403,7 +403,7 @@ conn_sync_connect(connectionObject *self)  }  static int -conn_async_connect(connectionObject *self) +_conn_async_connect(connectionObject *self)  {      PGconn *pgconn; @@ -426,7 +426,10 @@ conn_async_connect(connectionObject *self)      PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self); -    self->status = CONN_STATUS_SETUP; +    /* The connection will be completed banging on poll(): +     * First with _conn_poll_connecting() that will finish connection, +     * then with _conn_poll_setup_async() that will do the same job +     * of setup_async(). */      return 0;  } @@ -436,340 +439,23 @@ conn_connect(connectionObject *self, long int async)  {     if (async == 1) {        Dprintf("con_connect: connecting in ASYNC mode"); -      return conn_async_connect(self); +      return _conn_async_connect(self);      }      else {        Dprintf("con_connect: connecting in SYNC mode"); -      return conn_sync_connect(self); -    } -} - -/* conn_poll_connect_send - handle connection polling when flushing output -   during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_send(connectionObject *self) -{ -    const char *query = NULL; -    int next_status; -    int ret; - -    Dprintf("conn_poll_connect_send: status %d", self->status); - -    switch (self->status) { -    case CONN_STATUS_SEND_DATESTYLE: -        /* set the datestyle */ -        query = psyco_datestyle; -        next_status = CONN_STATUS_SENT_DATESTYLE; -        break; -    case CONN_STATUS_SEND_CLIENT_ENCODING: -        /* get the client_encoding */ -        query = psyco_client_encoding; -        next_status = CONN_STATUS_SENT_CLIENT_ENCODING; -        break; -    case CONN_STATUS_SENT_DATESTYLE: -    case CONN_STATUS_SENT_CLIENT_ENCODING: -        /* the query has only been partially sent */ -        query = NULL; -        next_status = self->status; -        break; -    default: -        /* unexpected state, error out */ -        PyErr_Format(OperationalError, -                     "unexpected state: %d", self->status); -        return NULL; -    } - -    Dprintf("conn_poll_connect_send: sending query %-.200s", query); - -    Py_BEGIN_ALLOW_THREADS; -    pthread_mutex_lock(&(self->lock)); - -    if (query != NULL) { -        if (PQsendQuery(self->pgconn, query) != 1) { -            pthread_mutex_unlock(&(self->lock)); -            Py_BLOCK_THREADS; -            PyErr_SetString(OperationalError, -                            PQerrorMessage(self->pgconn)); -            return NULL; -        } -    } - -    if (PQflush(self->pgconn) == 0) { -        /* the query got fully sent to the server */ -        Dprintf("conn_poll_connect_send: query got flushed immediately"); -        /* the return value will be POLL_READ */ -        ret = PSYCO_POLL_READ; - -        /* advance the next status, since we skip over the "waiting for the -           query to be sent" status */ -        switch (next_status) { -        case CONN_STATUS_SENT_DATESTYLE: -            next_status = CONN_STATUS_GET_DATESTYLE; -            break; -        case CONN_STATUS_SENT_CLIENT_ENCODING: -            next_status = CONN_STATUS_GET_CLIENT_ENCODING; -            break; -        } -    } -    else { -        /* query did not get sent completely, tell the client to wait for the -           socket to become writable */ -        ret = PSYCO_POLL_WRITE; -    } - -    self->status = next_status; -    Dprintf("conn_poll_connect_send: next status is %d, returning %d", -            self->status, ret); - -    pthread_mutex_unlock(&(self->lock)); -    Py_END_ALLOW_THREADS; - -    return PyInt_FromLong(ret); -} - -/* conn_poll_connect_fetch - handle connection polling when reading result -   during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_fetch(connectionObject *self) -{ -    PGresult *pgres; -    int is_busy; -    int next_status; -    int ret; - -    Dprintf("conn_poll_connect_fetch: status %d", self->status); - -    /* consume the input */ -    is_busy = pq_is_busy(self); -    if (is_busy == -1) { -        /* there was an error, raise the exception */ -        return NULL; -    } -    else if (is_busy == 1) { -        /* the connection is busy, tell the user to wait more */ -        Dprintf("conn_poll_connect_fetch: connection busy, returning %d", -                PSYCO_POLL_READ); -        return PyInt_FromLong(PSYCO_POLL_READ); -    } - -    Py_BEGIN_ALLOW_THREADS; -    pthread_mutex_lock(&(self->lock)); - -    /* connection no longer busy, process input */ -    pgres = PQgetResult(self->pgconn); - -    /* do the rest while holding the GIL, we won't be calling into any -       blocking API */ -    pthread_mutex_unlock(&(self->lock)); -    Py_END_ALLOW_THREADS; - -    Dprintf("conn_poll_connect_fetch: got result %p", pgres); - -    /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */ -    if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK && -                          PQresultStatus(pgres) != PGRES_TUPLES_OK)) { -        PyErr_SetString(OperationalError, "can't issue " -                        "initial connection queries"); -        IFCLEARPGRES(pgres); -        return NULL; -    } - -    if (self->status == CONN_STATUS_GET_DATESTYLE) { -        /* got the result from SET DATESTYLE*/ -        Dprintf("conn_poll_connect_fetch: datestyle set"); -        next_status = CONN_STATUS_SEND_CLIENT_ENCODING; -    } -    else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) { -        /* got the client_encoding */ -        self->encoding = conn_get_encoding(pgres); -        if (self->encoding == NULL) { -            return NULL; -        } -        Dprintf("conn_poll_connect_fetch: got client_encoding %s", self->encoding); - -        /* since this is the last step, set the other instance variables now */ -        self->equote = conn_get_standard_conforming_strings(self->pgconn); -        self->protocol = conn_get_protocol_version(self->pgconn); -        self->server_version = conn_get_server_version(self->pgconn); -        /* -         * asynchronous connections always use isolation level 0, the user is -         * expected to manage the transactions himself, by sending -         * (asynchronously) BEGIN and COMMIT statements. -         */ -        self->isolation_level = 0; - -        /* FIXME: this is a bug: the above queries were sent to the server -          with a blocking connection */ -        if (pq_set_non_blocking(self, 1, 1) != 0) { -            return NULL; -        } - -        /* next status is going to READY */ -        next_status = CONN_STATUS_READY; -    } -    else { -        /* unexpected state, error out */ -        PyErr_Format(OperationalError, -                     "unexpected state: %d", self->status); -        return NULL; +      return _conn_sync_connect(self);      } - -    /* clear any leftover result, there should be none, but the protocol -       requires calling PQgetResult until you get a NULL */ -    pq_clear_async(self); - -    self->status = next_status; - -    /* if the curent status is READY it means we got the result of the -       last initialization query, so we return POLL_OK, otherwise we need to -       send another query, so return POLL_WRITE */ -    ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE; -    Dprintf("conn_poll_connect_fetch: next status is %d, returning %d", -            self->status, ret); -    return PyInt_FromLong(ret);  } -/* conn_poll_ready - handle connection polling when it is already open */ - -PyObject * -conn_poll_ready(connectionObject *self) -{ -    int is_busy; - -    /* if there is an asynchronous query underway, poll it */ -    if (self->async_cursor != NULL) { -        if (self->async_status == ASYNC_WRITE) { -            return conn_poll_send(self); -        } -        else { -            /* this gets called both for ASYNC_READ and ASYNC_DONE, because -               even if the async query is complete, we still might want to -               check for NOTIFYs */ -            return conn_poll_fetch(self); -        } -    } - -    /* otherwise just check for NOTIFYs */ -    is_busy = pq_is_busy(self); -    if (is_busy == -1) { -        /* there was an error, raise the exception */ -        return NULL; -    } -    else if (is_busy == 1) { -        /* the connection is busy, tell the user to wait more */ -        Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ); -        return PyInt_FromLong(PSYCO_POLL_READ); -    } -    else { -        /* connection is idle */ -        Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK); -        return PyInt_FromLong(PSYCO_POLL_OK); -    } -} - -/* conn_poll_send - poll the connection when flushing data to the backend */ - -PyObject * -conn_poll_send(connectionObject *self) -{ -    int res; - -    /* flush queued output to the server */ -    res = pq_flush(self); - -    if (res == 1) { -        /* some data still waiting to be flushed */ -        Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE); -        return PyInt_FromLong(PSYCO_POLL_WRITE); -    } -    else if (res == 0) { -        /* all data flushed, start waiting for results */ -        Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ); -        self->async_status = ASYNC_READ; -        return PyInt_FromLong(PSYCO_POLL_READ); -    } -    else { -        /* unexpected result */ -        PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); -        return NULL; -    } -} - -/* conn_poll_fetch - poll the connection when reading results from the backend - * - * If self_curs is available, use it to store the result of the last query. - * Also unlink it when finished. - */ - -PyObject * -conn_poll_fetch(connectionObject *self) -{ -    int is_busy; -    int last_result; - -    /* consume the input */ -    is_busy = pq_is_busy(self); -    if (is_busy == -1) { -        /* there was an error, raise the exception */ -        return NULL; -    } -    else if (is_busy == 1) { -        /* the connection is busy, tell the user to wait more */ -        Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ); -        return PyInt_FromLong(PSYCO_POLL_READ); -    } - -    /* try to fetch the data only if this was a poll following a read -       request; else just return POLL_OK to the user: this is necessary -       because of asynchronous NOTIFYs that can be sent by the backend -       even if the user didn't asked for them */ - -    if (self->async_status == ASYNC_READ && self->async_cursor) { -        cursorObject *curs = (cursorObject *)self->async_cursor; -        IFCLEARPGRES(curs->pgres); -        curs->pgres = pq_get_last_result(self); - -        /* fetch the tuples (if there are any) and build the result. We don't -         * care if pq_fetch return 0 or 1, but if there was an error, we want to -         * signal it to the caller. */ -        last_result = pq_fetch(curs) == -1 ? -1 : 0; - -        /* We have finished with our async_cursor */ -        Py_XDECREF(self->async_cursor); -        self->async_cursor = NULL; -    } -    else { -        last_result = 0; -    } - -    if (last_result == 0) { -        Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK); -        /* self->async_status cannot be ASYNC_WRITE here, because we -           never execute curs_poll_fetch in ASYNC_WRITE state, so we can -           safely set it to ASYNC_DONE because we either fetched the result or -           there is no result to fetch */ -        self->async_status = ASYNC_DONE; -        return PyInt_FromLong(PSYCO_POLL_OK); -    } -    else if (last_result == 1) { -        Dprintf("conn_poll_fetch: got result, but data remaining, " -                "returning %d", PSYCO_POLL_READ); -        return PyInt_FromLong(PSYCO_POLL_READ); -    } -    else { -        return NULL; -    } -}  /* poll during a connection attempt until the connection has established. */ -int +static int  _conn_poll_connecting(connectionObject *self)  {      int res = PSYCO_POLL_ERROR; +    Dprintf("conn_poll: poll connecting");      switch (PQconnectPoll(self->pgconn)) {      case PGRES_POLLING_OK:          res = PSYCO_POLL_OK; @@ -792,11 +478,13 @@ _conn_poll_connecting(connectionObject *self)  /* Advance to the next state after an attempt of flushing output */ -int + +static int  _conn_poll_advance_write(connectionObject *self, int flush)  {      int res; +    Dprintf("conn_poll: poll writing");      switch (flush) {      case  0:  /* success */          /* we've finished pushing the query to the server. Let's start @@ -821,11 +509,12 @@ _conn_poll_advance_write(connectionObject *self, int flush)  }  /* Advance to the next state after a call to a pq_is_busy* function */ -int +static int  _conn_poll_advance_read(connectionObject *self, int busy)  {      int res; +    Dprintf("conn_poll: poll reading");      switch (busy) {      case 0: /* result is ready */          res = PSYCO_POLL_OK; @@ -850,7 +539,8 @@ _conn_poll_advance_read(connectionObject *self, int busy)    Advance the async_status (usually going WRITE -> READ -> DONE) but don't    mess with the connection status. */ -int + +static int  _conn_poll_query(connectionObject *self)  {      int res = PSYCO_POLL_ERROR; @@ -891,30 +581,148 @@ _conn_poll_query(connectionObject *self)      return res;  } +/* Advance to the next state during an async connection setup + * + * If the connection is green, this is performed by the regular + * sync code so the queries are sent by conn_setup() while in + * CONN_STATUS_READY state. + */ +static int +_conn_poll_setup_async(connectionObject *self) +{ +    int res = PSYCO_POLL_ERROR; +    PGresult *pgres; + +    switch (self->status) { +    case CONN_STATUS_CONNECTING: +        /* Set the connection to nonblocking now. */ +        if (pq_set_non_blocking(self, 1, 1) != 0) { +            break; +        } + +        self->equote = conn_get_standard_conforming_strings(self->pgconn); +        self->protocol = conn_get_protocol_version(self->pgconn); +        self->server_version = conn_get_server_version(self->pgconn); -/* conn_poll_green - poll a *sync* connection with external wait */ +        /* asynchronous connections always use isolation level 0, the user is +         * expected to manage the transactions himself, by sending +         * (asynchronously) BEGIN and COMMIT statements. +         */ +        self->isolation_level = 0; -PyObject * -conn_poll_green(connectionObject *self) +        Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); +        self->status = CONN_STATUS_DATESTYLE; +        if (0 == pq_send_query(self, psyco_datestyle)) { +            PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); +            break; +        } +        Dprintf("conn_poll: async_status -> ASYNC_WRITE"); +        self->async_status = ASYNC_WRITE; +        res = PSYCO_POLL_WRITE; +        break; + +    case CONN_STATUS_DATESTYLE: +        res = _conn_poll_query(self); +        if (res == PSYCO_POLL_OK) { +            res = PSYCO_POLL_ERROR; +            pgres = pq_get_last_result(self); +            if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { +                PyErr_SetString(OperationalError, "can't set datestyle to ISO"); +                break; +            } +            CLEARPGRES(pgres); + +            Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING"); +            self->status = CONN_STATUS_CLIENT_ENCODING; +            if (0 == pq_send_query(self, psyco_client_encoding)) { +                PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); +                break; +            } +            Dprintf("conn_poll: async_status -> ASYNC_WRITE"); +            self->async_status = ASYNC_WRITE; +            res = PSYCO_POLL_WRITE; +        } +        break; + +    case CONN_STATUS_CLIENT_ENCODING: +        res = _conn_poll_query(self); +        if (res == PSYCO_POLL_OK) { +            res = PSYCO_POLL_ERROR; +            pgres = pq_get_last_result(self); +            if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { +                PyErr_SetString(OperationalError, "can't fetch client_encoding"); +                break; +            } + +            /* conn_get_encoding returns a malloc'd string */ +            self->encoding = conn_get_encoding(pgres); +            CLEARPGRES(pgres); +            if (self->encoding == NULL) { break; } + +            Dprintf("conn_poll: status -> CONN_STATUS_READY"); +            self->status = CONN_STATUS_READY; +            res = PSYCO_POLL_OK; +        } +        break; +    } + +    return res; +} + + +/* conn_poll - Main polling switch + * + * The function is called in all the states and connection types and invokes + * the right "next step". + */ + +int +conn_poll(connectionObject *self)  {      int res = PSYCO_POLL_ERROR; +    Dprintf("conn_poll: status = %d", self->status);      switch (self->status) {      case CONN_STATUS_SETUP: -        Dprintf("conn_poll: status = CONN_STATUS_SETUP"); +        Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");          self->status = CONN_STATUS_CONNECTING; -        res =  PSYCO_POLL_WRITE; +        res = PSYCO_POLL_WRITE;          break;      case CONN_STATUS_CONNECTING: -        Dprintf("conn_poll: status = CONN_STATUS_CONNECTING");          res = _conn_poll_connecting(self); +        if (res == PSYCO_POLL_OK && self->async) { +            res = _conn_poll_setup_async(self); +        } +        break; + +    case CONN_STATUS_DATESTYLE: +    case CONN_STATUS_CLIENT_ENCODING: +        res = _conn_poll_setup_async(self);          break;      case CONN_STATUS_READY:      case CONN_STATUS_BEGIN: -        Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN");          res = _conn_poll_query(self); + +        if (res == PSYCO_POLL_OK && self->async_cursor) { +            /* An async query has just finished: parse the tuple in the +             * target cursor. */ +            cursorObject *curs = (cursorObject *)self->async_cursor; +            IFCLEARPGRES(curs->pgres); +            curs->pgres = pq_get_last_result(self); + +            /* fetch the tuples (if there are any) and build the result. We +             * don't care if pq_fetch return 0 or 1, but if there was an error, +             * we want to signal it to the caller. */ +            if (pq_fetch(curs) == -1) { +               res = PSYCO_POLL_ERROR; +            } + +            /* We have finished with our async_cursor */ +            Py_XDECREF(self->async_cursor); +            self->async_cursor = NULL; +        }          break;      default: @@ -922,12 +730,7 @@ conn_poll_green(connectionObject *self)          res = PSYCO_POLL_ERROR;      } -    if (!(res == PSYCO_POLL_ERROR && PyErr_Occurred())) { -        return PyInt_FromLong(res); -    } else { -        /* There is an error and an exception is already in place */ -        return NULL; -    } +    return res;  }  /* conn_close - do anything needed to shut down the connection */ diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 9b28b0f..af4d5e6 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -408,129 +408,19 @@ psyco_conn_get_exception(PyObject *self, void *closure)      return exception;  } -#define psyco_conn_poll_doc \ -"poll() -- return POLL_OK if the operation has finished, "           \ - "POLL_READ if the application should be waiting "                   \ - "for the socket to be readable or POLL_WRITE "                      \ - "if the socket should be writable." - -static PyObject * -psyco_conn_poll_async(connectionObject *self) -{ -    PostgresPollingStatusType poll_status; - -    Dprintf("conn_poll: polling with status %d", self->status); - -    switch (self->status) { - -    case CONN_STATUS_SETUP: -        /* according to libpq documentation the user should start by waiting -           for the socket to become writable */ -        self->status = CONN_STATUS_CONNECTING; -        return PyInt_FromLong(PSYCO_POLL_WRITE); - -    case CONN_STATUS_CONNECTING: -        /* this means we are in the middle of a PQconnectPoll loop */ -        break; - -    case CONN_STATUS_SEND_DATESTYLE: -    case CONN_STATUS_SENT_DATESTYLE: -    case CONN_STATUS_SEND_CLIENT_ENCODING: -    case CONN_STATUS_SENT_CLIENT_ENCODING: -        /* these mean that we need to wait for the socket to become writable -           to send the rest of our query */ -        return conn_poll_connect_send(self); - -    case CONN_STATUS_GET_DATESTYLE: -    case CONN_STATUS_GET_CLIENT_ENCODING: -        /* these mean that we are waiting for the results of the queries */ -        return conn_poll_connect_fetch(self); - -    case CONN_STATUS_READY: -    case CONN_STATUS_BEGIN: -        /* The connection is ready, but we might be in an asynchronous query, -           or we just might want to check for NOTIFYs.  For synchronous -           connections the status might be BEGIN, not READY. */ -        return conn_poll_ready(self); - -    default: -        /* everything else is an error */ -        PyErr_SetString(OperationalError, -                        "not in asynchronous connection attempt"); -        return NULL; - -    } - -    Py_BEGIN_ALLOW_THREADS; -    pthread_mutex_lock(&self->lock); - -    poll_status = PQconnectPoll(self->pgconn); - -    if (poll_status == PGRES_POLLING_READING) { -        pthread_mutex_unlock(&(self->lock)); -        Py_BLOCK_THREADS; -        Dprintf("conn_poll: returing POLL_READ"); -        return PyInt_FromLong(PSYCO_POLL_READ); -    } - -    if (poll_status == PGRES_POLLING_WRITING) { -        pthread_mutex_unlock(&(self->lock)); -        Py_BLOCK_THREADS; -        Dprintf("conn_poll: returing POLL_WRITE"); -        return PyInt_FromLong(PSYCO_POLL_WRITE); -    } - -    if (poll_status == PGRES_POLLING_FAILED) { -        pthread_mutex_unlock(&(self->lock)); -        Py_BLOCK_THREADS; -        PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); -        return NULL; -    } - -    /* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK, -       but make sure */ -    if (poll_status != PGRES_POLLING_OK) { -        pthread_mutex_unlock(&(self->lock)); -        Py_BLOCK_THREADS; -        PyErr_Format(OperationalError, -                     "unexpected result from PQconnectPoll: %d", poll_status); -        return NULL; -    } - -    Dprintf("conn_poll: got POLL_OK"); - -    /* the connection is built, but we want to do a few other things before we -       let the user use it */ - -    self->equote = conn_get_standard_conforming_strings(self->pgconn); - -    Dprintf("conn_poll: got standard_conforming_strings"); - -    /* -     * Here is the tricky part, we need to figure the datestyle, -     * client_encoding and isolevel, all using nonblocking calls.  To do that -     * we will keep telling the user to poll, while we are waiting for our -     * asynchronous queries to complete. -     */ -    pthread_mutex_unlock(&(self->lock)); -    Py_END_ALLOW_THREADS; -    /* the next operation the client will do is send a query, so ask him to -       wait for a writable condition */ -    self->status = CONN_STATUS_SEND_DATESTYLE; -    Dprintf("conn_poll: connection is built, retrning %d", -            PSYCO_POLL_WRITE); -    return PyInt_FromLong(PSYCO_POLL_WRITE); -} -  static PyObject *  psyco_conn_poll(connectionObject *self)  { +    int res; +      EXC_IF_CONN_CLOSED(self); -    if (self->async) { -        return psyco_conn_poll_async(self); +    res = conn_poll(self); +    if (res != PSYCO_POLL_ERROR || !PyErr_Occurred()) { +        return PyInt_FromLong(res);      } else { -        return conn_poll_green(self); +        /* There is an error and an exception is already in place */ +        return NULL;      }  } diff --git a/psycopg/green.c b/psycopg/green.c index c9df519..6ad6595 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -147,14 +147,13 @@ psyco_exec_green(connectionObject *conn, const char *command)      PGresult *result = NULL;      PyObject *cb, *pyrv; +    Dprintf("psyco_exec_green: executing query async");      if (!(cb = have_wait_callback())) {          goto end;      }      /* Send the query asynchronously */ -    Dprintf("psyco_exec_green: sending query async"); -    if (0 == PQsendQuery(conn->pgconn, command)) { -        Dprintf("psyco_exec_green: PQsendQuery returned 0"); +    if (0 == pq_send_query(conn, command)) {          goto clear;      } diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 8821151..2cdd8eb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -803,6 +803,27 @@ pq_execute(cursorObject *curs, const char *query, int async)      return 1-async;  } +/* send an async query to the backend. + * + * Return 1 if command succeeded, else 0. + * + * The function should be called helding the connection lock and the GIL. + */ +int +pq_send_query(connectionObject *conn, const char *query) +{ +    int rv; + +    Dprintf("pq_send_query: sending ASYNC query:"); +    Dprintf("    %-.200s", query); + +    if (0 == (rv = PQsendQuery(conn->pgconn, query))) { +        Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); +    } + +    return rv; +} +  /* Return the last result available on the connection.   *   * The function will block will block only if a command is active and the diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index f1cfc09..7f11383 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -38,6 +38,7 @@  HIDDEN PGresult *pq_get_last_result(connectionObject *conn);  HIDDEN int pq_fetch(cursorObject *curs);  HIDDEN int pq_execute(cursorObject *curs, const char *query, int async); +HIDDEN int pq_send_query(connectionObject *conn, const char *query);  HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres,                             char **error, PyThreadState **tstate);  HIDDEN int pq_commit(connectionObject *conn); | 
