diff options
| -rw-r--r-- | psycopg/connection.h | 17 | ||||
| -rw-r--r-- | psycopg/connection_int.c | 503 | ||||
| -rw-r--r-- | psycopg/connection_type.c | 124 | ||||
| -rw-r--r-- | psycopg/green.c | 5 | ||||
| -rw-r--r-- | psycopg/pqpath.c | 21 | ||||
| -rw-r--r-- | psycopg/pqpath.h | 1 |
6 files changed, 185 insertions, 486 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h index 8bf64fe..69f1385 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -45,16 +45,6 @@ extern "C" { #define CONN_STATUS_DATESTYLE 21 #define CONN_STATUS_CLIENT_ENCODING 22 -/* TODO: REMOVE THOSE */ -#define CONN_STATUS_SYNC 3 -#define CONN_STATUS_ASYNC 4 -#define CONN_STATUS_SEND_DATESTYLE 5 -#define CONN_STATUS_SENT_DATESTYLE 6 -#define CONN_STATUS_GET_DATESTYLE 7 -#define CONN_STATUS_SEND_CLIENT_ENCODING 8 -#define CONN_STATUS_SENT_CLIENT_ENCODING 9 -#define CONN_STATUS_GET_CLIENT_ENCODING 10 - /* async query execution status */ #define ASYNC_DONE 0 #define ASYNC_READ 1 @@ -137,12 +127,7 @@ HIDDEN int conn_commit(connectionObject *self); HIDDEN int conn_rollback(connectionObject *self); HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); -HIDDEN PyObject *conn_poll_connect_send(connectionObject *self); -HIDDEN PyObject *conn_poll_connect_fetch(connectionObject *self); -HIDDEN PyObject *conn_poll_ready(connectionObject *self); -HIDDEN PyObject *conn_poll_send(connectionObject *self); -HIDDEN PyObject *conn_poll_fetch(connectionObject *self); -HIDDEN PyObject *conn_poll_green(connectionObject *self); +HIDDEN int conn_poll(connectionObject *self); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index f398238..b878749 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -208,7 +208,6 @@ conn_get_encoding(PGresult *pgres) for (i=0 ; i < strlen(tmp) ; i++) encoding[i] = toupper(tmp[i]); encoding[i] = '\0'; - CLEARPGRES(pgres); return encoding; } @@ -258,10 +257,18 @@ conn_setup(connectionObject *self, PGconn *pgconn) { PGresult *pgres; + self->equote = conn_get_standard_conforming_strings(pgconn); + self->server_version = conn_get_server_version(pgconn); + self->protocol = conn_get_protocol_version(self->pgconn); + Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; + if (pq_set_non_blocking(self, 1, 1) != 0) { + return -1; + } + if (!psyco_green()) { Py_UNBLOCK_THREADS; pgres = PQexec(pgconn, psyco_datestyle); @@ -305,6 +312,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; return -1; } + CLEARPGRES(pgres); if (!psyco_green()) { Py_UNBLOCK_THREADS; @@ -334,8 +342,8 @@ conn_setup(connectionObject *self, PGconn *pgconn) /* conn_connect - execute a connection to the database */ -int -conn_sync_connect(connectionObject *self) +static int +_conn_sync_connect(connectionObject *self) { PGconn *pgconn; PyObject *wait_rv; @@ -382,19 +390,11 @@ conn_sync_connect(connectionObject *self) } } - self->equote = conn_get_standard_conforming_strings(pgconn); - self->server_version = conn_get_server_version(pgconn); - self->protocol = conn_get_protocol_version(self->pgconn); - /* From here the connection is considered ready: with the new status, * poll() will use PQisBusy instead of PQconnectPoll. */ self->status = CONN_STATUS_READY; - if (pq_set_non_blocking(self, 1, 1) != 0) { - return -1; - } - if (conn_setup(self, self->pgconn) == -1) { return -1; } @@ -403,7 +403,7 @@ conn_sync_connect(connectionObject *self) } static int -conn_async_connect(connectionObject *self) +_conn_async_connect(connectionObject *self) { PGconn *pgconn; @@ -426,7 +426,10 @@ conn_async_connect(connectionObject *self) PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self); - self->status = CONN_STATUS_SETUP; + /* The connection will be completed banging on poll(): + * First with _conn_poll_connecting() that will finish connection, + * then with _conn_poll_setup_async() that will do the same job + * of setup_async(). */ return 0; } @@ -436,340 +439,23 @@ conn_connect(connectionObject *self, long int async) { if (async == 1) { Dprintf("con_connect: connecting in ASYNC mode"); - return conn_async_connect(self); + return _conn_async_connect(self); } else { Dprintf("con_connect: connecting in SYNC mode"); - return conn_sync_connect(self); - } -} - -/* conn_poll_connect_send - handle connection polling when flushing output - during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_send(connectionObject *self) -{ - const char *query = NULL; - int next_status; - int ret; - - Dprintf("conn_poll_connect_send: status %d", self->status); - - switch (self->status) { - case CONN_STATUS_SEND_DATESTYLE: - /* set the datestyle */ - query = psyco_datestyle; - next_status = CONN_STATUS_SENT_DATESTYLE; - break; - case CONN_STATUS_SEND_CLIENT_ENCODING: - /* get the client_encoding */ - query = psyco_client_encoding; - next_status = CONN_STATUS_SENT_CLIENT_ENCODING; - break; - case CONN_STATUS_SENT_DATESTYLE: - case CONN_STATUS_SENT_CLIENT_ENCODING: - /* the query has only been partially sent */ - query = NULL; - next_status = self->status; - break; - default: - /* unexpected state, error out */ - PyErr_Format(OperationalError, - "unexpected state: %d", self->status); - return NULL; - } - - Dprintf("conn_poll_connect_send: sending query %-.200s", query); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->lock)); - - if (query != NULL) { - if (PQsendQuery(self->pgconn, query) != 1) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - PyErr_SetString(OperationalError, - PQerrorMessage(self->pgconn)); - return NULL; - } - } - - if (PQflush(self->pgconn) == 0) { - /* the query got fully sent to the server */ - Dprintf("conn_poll_connect_send: query got flushed immediately"); - /* the return value will be POLL_READ */ - ret = PSYCO_POLL_READ; - - /* advance the next status, since we skip over the "waiting for the - query to be sent" status */ - switch (next_status) { - case CONN_STATUS_SENT_DATESTYLE: - next_status = CONN_STATUS_GET_DATESTYLE; - break; - case CONN_STATUS_SENT_CLIENT_ENCODING: - next_status = CONN_STATUS_GET_CLIENT_ENCODING; - break; - } - } - else { - /* query did not get sent completely, tell the client to wait for the - socket to become writable */ - ret = PSYCO_POLL_WRITE; - } - - self->status = next_status; - Dprintf("conn_poll_connect_send: next status is %d, returning %d", - self->status, ret); - - pthread_mutex_unlock(&(self->lock)); - Py_END_ALLOW_THREADS; - - return PyInt_FromLong(ret); -} - -/* conn_poll_connect_fetch - handle connection polling when reading result - during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_fetch(connectionObject *self) -{ - PGresult *pgres; - int is_busy; - int next_status; - int ret; - - Dprintf("conn_poll_connect_fetch: status %d", self->status); - - /* consume the input */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_connect_fetch: connection busy, returning %d", - PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->lock)); - - /* connection no longer busy, process input */ - pgres = PQgetResult(self->pgconn); - - /* do the rest while holding the GIL, we won't be calling into any - blocking API */ - pthread_mutex_unlock(&(self->lock)); - Py_END_ALLOW_THREADS; - - Dprintf("conn_poll_connect_fetch: got result %p", pgres); - - /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */ - if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK && - PQresultStatus(pgres) != PGRES_TUPLES_OK)) { - PyErr_SetString(OperationalError, "can't issue " - "initial connection queries"); - IFCLEARPGRES(pgres); - return NULL; - } - - if (self->status == CONN_STATUS_GET_DATESTYLE) { - /* got the result from SET DATESTYLE*/ - Dprintf("conn_poll_connect_fetch: datestyle set"); - next_status = CONN_STATUS_SEND_CLIENT_ENCODING; - } - else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) { - /* got the client_encoding */ - self->encoding = conn_get_encoding(pgres); - if (self->encoding == NULL) { - return NULL; - } - Dprintf("conn_poll_connect_fetch: got client_encoding %s", self->encoding); - - /* since this is the last step, set the other instance variables now */ - self->equote = conn_get_standard_conforming_strings(self->pgconn); - self->protocol = conn_get_protocol_version(self->pgconn); - self->server_version = conn_get_server_version(self->pgconn); - /* - * asynchronous connections always use isolation level 0, the user is - * expected to manage the transactions himself, by sending - * (asynchronously) BEGIN and COMMIT statements. - */ - self->isolation_level = 0; - - /* FIXME: this is a bug: the above queries were sent to the server - with a blocking connection */ - if (pq_set_non_blocking(self, 1, 1) != 0) { - return NULL; - } - - /* next status is going to READY */ - next_status = CONN_STATUS_READY; - } - else { - /* unexpected state, error out */ - PyErr_Format(OperationalError, - "unexpected state: %d", self->status); - return NULL; + return _conn_sync_connect(self); } - - /* clear any leftover result, there should be none, but the protocol - requires calling PQgetResult until you get a NULL */ - pq_clear_async(self); - - self->status = next_status; - - /* if the curent status is READY it means we got the result of the - last initialization query, so we return POLL_OK, otherwise we need to - send another query, so return POLL_WRITE */ - ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE; - Dprintf("conn_poll_connect_fetch: next status is %d, returning %d", - self->status, ret); - return PyInt_FromLong(ret); } -/* conn_poll_ready - handle connection polling when it is already open */ - -PyObject * -conn_poll_ready(connectionObject *self) -{ - int is_busy; - - /* if there is an asynchronous query underway, poll it */ - if (self->async_cursor != NULL) { - if (self->async_status == ASYNC_WRITE) { - return conn_poll_send(self); - } - else { - /* this gets called both for ASYNC_READ and ASYNC_DONE, because - even if the async query is complete, we still might want to - check for NOTIFYs */ - return conn_poll_fetch(self); - } - } - - /* otherwise just check for NOTIFYs */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - /* connection is idle */ - Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK); - return PyInt_FromLong(PSYCO_POLL_OK); - } -} - -/* conn_poll_send - poll the connection when flushing data to the backend */ - -PyObject * -conn_poll_send(connectionObject *self) -{ - int res; - - /* flush queued output to the server */ - res = pq_flush(self); - - if (res == 1) { - /* some data still waiting to be flushed */ - Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE); - return PyInt_FromLong(PSYCO_POLL_WRITE); - } - else if (res == 0) { - /* all data flushed, start waiting for results */ - Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ); - self->async_status = ASYNC_READ; - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - /* unexpected result */ - PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); - return NULL; - } -} - -/* conn_poll_fetch - poll the connection when reading results from the backend - * - * If self_curs is available, use it to store the result of the last query. - * Also unlink it when finished. - */ - -PyObject * -conn_poll_fetch(connectionObject *self) -{ - int is_busy; - int last_result; - - /* consume the input */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - /* try to fetch the data only if this was a poll following a read - request; else just return POLL_OK to the user: this is necessary - because of asynchronous NOTIFYs that can be sent by the backend - even if the user didn't asked for them */ - - if (self->async_status == ASYNC_READ && self->async_cursor) { - cursorObject *curs = (cursorObject *)self->async_cursor; - IFCLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); - - /* fetch the tuples (if there are any) and build the result. We don't - * care if pq_fetch return 0 or 1, but if there was an error, we want to - * signal it to the caller. */ - last_result = pq_fetch(curs) == -1 ? -1 : 0; - - /* We have finished with our async_cursor */ - Py_XDECREF(self->async_cursor); - self->async_cursor = NULL; - } - else { - last_result = 0; - } - - if (last_result == 0) { - Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK); - /* self->async_status cannot be ASYNC_WRITE here, because we - never execute curs_poll_fetch in ASYNC_WRITE state, so we can - safely set it to ASYNC_DONE because we either fetched the result or - there is no result to fetch */ - self->async_status = ASYNC_DONE; - return PyInt_FromLong(PSYCO_POLL_OK); - } - else if (last_result == 1) { - Dprintf("conn_poll_fetch: got result, but data remaining, " - "returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - return NULL; - } -} /* poll during a connection attempt until the connection has established. */ -int +static int _conn_poll_connecting(connectionObject *self) { int res = PSYCO_POLL_ERROR; + Dprintf("conn_poll: poll connecting"); switch (PQconnectPoll(self->pgconn)) { case PGRES_POLLING_OK: res = PSYCO_POLL_OK; @@ -792,11 +478,13 @@ _conn_poll_connecting(connectionObject *self) /* Advance to the next state after an attempt of flushing output */ -int + +static int _conn_poll_advance_write(connectionObject *self, int flush) { int res; + Dprintf("conn_poll: poll writing"); switch (flush) { case 0: /* success */ /* we've finished pushing the query to the server. Let's start @@ -821,11 +509,12 @@ _conn_poll_advance_write(connectionObject *self, int flush) } /* Advance to the next state after a call to a pq_is_busy* function */ -int +static int _conn_poll_advance_read(connectionObject *self, int busy) { int res; + Dprintf("conn_poll: poll reading"); switch (busy) { case 0: /* result is ready */ res = PSYCO_POLL_OK; @@ -850,7 +539,8 @@ _conn_poll_advance_read(connectionObject *self, int busy) Advance the async_status (usually going WRITE -> READ -> DONE) but don't mess with the connection status. */ -int + +static int _conn_poll_query(connectionObject *self) { int res = PSYCO_POLL_ERROR; @@ -891,30 +581,148 @@ _conn_poll_query(connectionObject *self) return res; } +/* Advance to the next state during an async connection setup + * + * If the connection is green, this is performed by the regular + * sync code so the queries are sent by conn_setup() while in + * CONN_STATUS_READY state. + */ +static int +_conn_poll_setup_async(connectionObject *self) +{ + int res = PSYCO_POLL_ERROR; + PGresult *pgres; + + switch (self->status) { + case CONN_STATUS_CONNECTING: + /* Set the connection to nonblocking now. */ + if (pq_set_non_blocking(self, 1, 1) != 0) { + break; + } + + self->equote = conn_get_standard_conforming_strings(self->pgconn); + self->protocol = conn_get_protocol_version(self->pgconn); + self->server_version = conn_get_server_version(self->pgconn); -/* conn_poll_green - poll a *sync* connection with external wait */ + /* asynchronous connections always use isolation level 0, the user is + * expected to manage the transactions himself, by sending + * (asynchronously) BEGIN and COMMIT statements. + */ + self->isolation_level = 0; -PyObject * -conn_poll_green(connectionObject *self) + Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); + self->status = CONN_STATUS_DATESTYLE; + if (0 == pq_send_query(self, psyco_datestyle)) { + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + break; + } + Dprintf("conn_poll: async_status -> ASYNC_WRITE"); + self->async_status = ASYNC_WRITE; + res = PSYCO_POLL_WRITE; + break; + + case CONN_STATUS_DATESTYLE: + res = _conn_poll_query(self); + if (res == PSYCO_POLL_OK) { + res = PSYCO_POLL_ERROR; + pgres = pq_get_last_result(self); + if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + PyErr_SetString(OperationalError, "can't set datestyle to ISO"); + break; + } + CLEARPGRES(pgres); + + Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING"); + self->status = CONN_STATUS_CLIENT_ENCODING; + if (0 == pq_send_query(self, psyco_client_encoding)) { + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + break; + } + Dprintf("conn_poll: async_status -> ASYNC_WRITE"); + self->async_status = ASYNC_WRITE; + res = PSYCO_POLL_WRITE; + } + break; + + case CONN_STATUS_CLIENT_ENCODING: + res = _conn_poll_query(self); + if (res == PSYCO_POLL_OK) { + res = PSYCO_POLL_ERROR; + pgres = pq_get_last_result(self); + if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { + PyErr_SetString(OperationalError, "can't fetch client_encoding"); + break; + } + + /* conn_get_encoding returns a malloc'd string */ + self->encoding = conn_get_encoding(pgres); + CLEARPGRES(pgres); + if (self->encoding == NULL) { break; } + + Dprintf("conn_poll: status -> CONN_STATUS_READY"); + self->status = CONN_STATUS_READY; + res = PSYCO_POLL_OK; + } + break; + } + + return res; +} + + +/* conn_poll - Main polling switch + * + * The function is called in all the states and connection types and invokes + * the right "next step". + */ + +int +conn_poll(connectionObject *self) { int res = PSYCO_POLL_ERROR; + Dprintf("conn_poll: status = %d", self->status); switch (self->status) { case CONN_STATUS_SETUP: - Dprintf("conn_poll: status = CONN_STATUS_SETUP"); + Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING"); self->status = CONN_STATUS_CONNECTING; - res = PSYCO_POLL_WRITE; + res = PSYCO_POLL_WRITE; break; case CONN_STATUS_CONNECTING: - Dprintf("conn_poll: status = CONN_STATUS_CONNECTING"); res = _conn_poll_connecting(self); + if (res == PSYCO_POLL_OK && self->async) { + res = _conn_poll_setup_async(self); + } + break; + + case CONN_STATUS_DATESTYLE: + case CONN_STATUS_CLIENT_ENCODING: + res = _conn_poll_setup_async(self); break; case CONN_STATUS_READY: case CONN_STATUS_BEGIN: - Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN"); res = _conn_poll_query(self); + + if (res == PSYCO_POLL_OK && self->async_cursor) { + /* An async query has just finished: parse the tuple in the + * target cursor. */ + cursorObject *curs = (cursorObject *)self->async_cursor; + IFCLEARPGRES(curs->pgres); + curs->pgres = pq_get_last_result(self); + + /* fetch the tuples (if there are any) and build the result. We + * don't care if pq_fetch return 0 or 1, but if there was an error, + * we want to signal it to the caller. */ + if (pq_fetch(curs) == -1) { + res = PSYCO_POLL_ERROR; + } + + /* We have finished with our async_cursor */ + Py_XDECREF(self->async_cursor); + self->async_cursor = NULL; + } break; default: @@ -922,12 +730,7 @@ conn_poll_green(connectionObject *self) res = PSYCO_POLL_ERROR; } - if (!(res == PSYCO_POLL_ERROR && PyErr_Occurred())) { - return PyInt_FromLong(res); - } else { - /* There is an error and an exception is already in place */ - return NULL; - } + return res; } /* conn_close - do anything needed to shut down the connection */ diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 9b28b0f..af4d5e6 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -408,129 +408,19 @@ psyco_conn_get_exception(PyObject *self, void *closure) return exception; } -#define psyco_conn_poll_doc \ -"poll() -- return POLL_OK if the operation has finished, " \ - "POLL_READ if the application should be waiting " \ - "for the socket to be readable or POLL_WRITE " \ - "if the socket should be writable." - -static PyObject * -psyco_conn_poll_async(connectionObject *self) -{ - PostgresPollingStatusType poll_status; - - Dprintf("conn_poll: polling with status %d", self->status); - - switch (self->status) { - - case CONN_STATUS_SETUP: - /* according to libpq documentation the user should start by waiting - for the socket to become writable */ - self->status = CONN_STATUS_CONNECTING; - return PyInt_FromLong(PSYCO_POLL_WRITE); - - case CONN_STATUS_CONNECTING: - /* this means we are in the middle of a PQconnectPoll loop */ - break; - - case CONN_STATUS_SEND_DATESTYLE: - case CONN_STATUS_SENT_DATESTYLE: - case CONN_STATUS_SEND_CLIENT_ENCODING: - case CONN_STATUS_SENT_CLIENT_ENCODING: - /* these mean that we need to wait for the socket to become writable - to send the rest of our query */ - return conn_poll_connect_send(self); - - case CONN_STATUS_GET_DATESTYLE: - case CONN_STATUS_GET_CLIENT_ENCODING: - /* these mean that we are waiting for the results of the queries */ - return conn_poll_connect_fetch(self); - - case CONN_STATUS_READY: - case CONN_STATUS_BEGIN: - /* The connection is ready, but we might be in an asynchronous query, - or we just might want to check for NOTIFYs. For synchronous - connections the status might be BEGIN, not READY. */ - return conn_poll_ready(self); - - default: - /* everything else is an error */ - PyErr_SetString(OperationalError, - "not in asynchronous connection attempt"); - return NULL; - - } - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&self->lock); - - poll_status = PQconnectPoll(self->pgconn); - - if (poll_status == PGRES_POLLING_READING) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - Dprintf("conn_poll: returing POLL_READ"); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - if (poll_status == PGRES_POLLING_WRITING) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - Dprintf("conn_poll: returing POLL_WRITE"); - return PyInt_FromLong(PSYCO_POLL_WRITE); - } - - if (poll_status == PGRES_POLLING_FAILED) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); - return NULL; - } - - /* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK, - but make sure */ - if (poll_status != PGRES_POLLING_OK) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - PyErr_Format(OperationalError, - "unexpected result from PQconnectPoll: %d", poll_status); - return NULL; - } - - Dprintf("conn_poll: got POLL_OK"); - - /* the connection is built, but we want to do a few other things before we - let the user use it */ - - self->equote = conn_get_standard_conforming_strings(self->pgconn); - - Dprintf("conn_poll: got standard_conforming_strings"); - - /* - * Here is the tricky part, we need to figure the datestyle, - * client_encoding and isolevel, all using nonblocking calls. To do that - * we will keep telling the user to poll, while we are waiting for our - * asynchronous queries to complete. - */ - pthread_mutex_unlock(&(self->lock)); - Py_END_ALLOW_THREADS; - /* the next operation the client will do is send a query, so ask him to - wait for a writable condition */ - self->status = CONN_STATUS_SEND_DATESTYLE; - Dprintf("conn_poll: connection is built, retrning %d", - PSYCO_POLL_WRITE); - return PyInt_FromLong(PSYCO_POLL_WRITE); -} - static PyObject * psyco_conn_poll(connectionObject *self) { + int res; + EXC_IF_CONN_CLOSED(self); - if (self->async) { - return psyco_conn_poll_async(self); + res = conn_poll(self); + if (res != PSYCO_POLL_ERROR || !PyErr_Occurred()) { + return PyInt_FromLong(res); } else { - return conn_poll_green(self); + /* There is an error and an exception is already in place */ + return NULL; } } diff --git a/psycopg/green.c b/psycopg/green.c index c9df519..6ad6595 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -147,14 +147,13 @@ psyco_exec_green(connectionObject *conn, const char *command) PGresult *result = NULL; PyObject *cb, *pyrv; + Dprintf("psyco_exec_green: executing query async"); if (!(cb = have_wait_callback())) { goto end; } /* Send the query asynchronously */ - Dprintf("psyco_exec_green: sending query async"); - if (0 == PQsendQuery(conn->pgconn, command)) { - Dprintf("psyco_exec_green: PQsendQuery returned 0"); + if (0 == pq_send_query(conn, command)) { goto clear; } diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 8821151..2cdd8eb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -803,6 +803,27 @@ pq_execute(cursorObject *curs, const char *query, int async) return 1-async; } +/* send an async query to the backend. + * + * Return 1 if command succeeded, else 0. + * + * The function should be called helding the connection lock and the GIL. + */ +int +pq_send_query(connectionObject *conn, const char *query) +{ + int rv; + + Dprintf("pq_send_query: sending ASYNC query:"); + Dprintf(" %-.200s", query); + + if (0 == (rv = PQsendQuery(conn->pgconn, query))) { + Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); + } + + return rv; +} + /* Return the last result available on the connection. * * The function will block will block only if a command is active and the diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index f1cfc09..7f11383 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -38,6 +38,7 @@ HIDDEN PGresult *pq_get_last_result(connectionObject *conn); HIDDEN int pq_fetch(cursorObject *curs); HIDDEN int pq_execute(cursorObject *curs, const char *query, int async); +HIDDEN int pq_send_query(connectionObject *conn, const char *query); HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error, PyThreadState **tstate); HIDDEN int pq_commit(connectionObject *conn); |
