diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-22 19:59:00 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-23 13:28:49 +0100 |
| commit | cb40342afa383176684c805789782403ee49a0a7 (patch) | |
| tree | 1009d985c53a801958d3a55d760edbfd6e15d87a /psycopg/connection_int.c | |
| parent | 0da4befe786679c557ef4ae52db486d704fc818d (diff) | |
| download | psycopg2-cb40342afa383176684c805789782403ee49a0a7.tar.gz | |
poll implementation for async, sync and green connection unified.
Diffstat (limited to 'psycopg/connection_int.c')
| -rw-r--r-- | psycopg/connection_int.c | 503 |
1 files changed, 153 insertions, 350 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index f398238..b878749 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -208,7 +208,6 @@ conn_get_encoding(PGresult *pgres) for (i=0 ; i < strlen(tmp) ; i++) encoding[i] = toupper(tmp[i]); encoding[i] = '\0'; - CLEARPGRES(pgres); return encoding; } @@ -258,10 +257,18 @@ conn_setup(connectionObject *self, PGconn *pgconn) { PGresult *pgres; + self->equote = conn_get_standard_conforming_strings(pgconn); + self->server_version = conn_get_server_version(pgconn); + self->protocol = conn_get_protocol_version(self->pgconn); + Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; + if (pq_set_non_blocking(self, 1, 1) != 0) { + return -1; + } + if (!psyco_green()) { Py_UNBLOCK_THREADS; pgres = PQexec(pgconn, psyco_datestyle); @@ -305,6 +312,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; return -1; } + CLEARPGRES(pgres); if (!psyco_green()) { Py_UNBLOCK_THREADS; @@ -334,8 +342,8 @@ conn_setup(connectionObject *self, PGconn *pgconn) /* conn_connect - execute a connection to the database */ -int -conn_sync_connect(connectionObject *self) +static int +_conn_sync_connect(connectionObject *self) { PGconn *pgconn; PyObject *wait_rv; @@ -382,19 +390,11 @@ conn_sync_connect(connectionObject *self) } } - self->equote = conn_get_standard_conforming_strings(pgconn); - self->server_version = conn_get_server_version(pgconn); - self->protocol = conn_get_protocol_version(self->pgconn); - /* From here the connection is considered ready: with the new status, * poll() will use PQisBusy instead of PQconnectPoll. */ self->status = CONN_STATUS_READY; - if (pq_set_non_blocking(self, 1, 1) != 0) { - return -1; - } - if (conn_setup(self, self->pgconn) == -1) { return -1; } @@ -403,7 +403,7 @@ conn_sync_connect(connectionObject *self) } static int -conn_async_connect(connectionObject *self) +_conn_async_connect(connectionObject *self) { PGconn *pgconn; @@ -426,7 +426,10 @@ conn_async_connect(connectionObject *self) PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self); - self->status = CONN_STATUS_SETUP; + /* The connection will be completed banging on poll(): + * First with _conn_poll_connecting() that will finish connection, + * then with _conn_poll_setup_async() that will do the same job + * of setup_async(). */ return 0; } @@ -436,340 +439,23 @@ conn_connect(connectionObject *self, long int async) { if (async == 1) { Dprintf("con_connect: connecting in ASYNC mode"); - return conn_async_connect(self); + return _conn_async_connect(self); } else { Dprintf("con_connect: connecting in SYNC mode"); - return conn_sync_connect(self); - } -} - -/* conn_poll_connect_send - handle connection polling when flushing output - during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_send(connectionObject *self) -{ - const char *query = NULL; - int next_status; - int ret; - - Dprintf("conn_poll_connect_send: status %d", self->status); - - switch (self->status) { - case CONN_STATUS_SEND_DATESTYLE: - /* set the datestyle */ - query = psyco_datestyle; - next_status = CONN_STATUS_SENT_DATESTYLE; - break; - case CONN_STATUS_SEND_CLIENT_ENCODING: - /* get the client_encoding */ - query = psyco_client_encoding; - next_status = CONN_STATUS_SENT_CLIENT_ENCODING; - break; - case CONN_STATUS_SENT_DATESTYLE: - case CONN_STATUS_SENT_CLIENT_ENCODING: - /* the query has only been partially sent */ - query = NULL; - next_status = self->status; - break; - default: - /* unexpected state, error out */ - PyErr_Format(OperationalError, - "unexpected state: %d", self->status); - return NULL; - } - - Dprintf("conn_poll_connect_send: sending query %-.200s", query); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->lock)); - - if (query != NULL) { - if (PQsendQuery(self->pgconn, query) != 1) { - pthread_mutex_unlock(&(self->lock)); - Py_BLOCK_THREADS; - PyErr_SetString(OperationalError, - PQerrorMessage(self->pgconn)); - return NULL; - } - } - - if (PQflush(self->pgconn) == 0) { - /* the query got fully sent to the server */ - Dprintf("conn_poll_connect_send: query got flushed immediately"); - /* the return value will be POLL_READ */ - ret = PSYCO_POLL_READ; - - /* advance the next status, since we skip over the "waiting for the - query to be sent" status */ - switch (next_status) { - case CONN_STATUS_SENT_DATESTYLE: - next_status = CONN_STATUS_GET_DATESTYLE; - break; - case CONN_STATUS_SENT_CLIENT_ENCODING: - next_status = CONN_STATUS_GET_CLIENT_ENCODING; - break; - } - } - else { - /* query did not get sent completely, tell the client to wait for the - socket to become writable */ - ret = PSYCO_POLL_WRITE; - } - - self->status = next_status; - Dprintf("conn_poll_connect_send: next status is %d, returning %d", - self->status, ret); - - pthread_mutex_unlock(&(self->lock)); - Py_END_ALLOW_THREADS; - - return PyInt_FromLong(ret); -} - -/* conn_poll_connect_fetch - handle connection polling when reading result - during asynchronous connection attempt. */ - -PyObject * -conn_poll_connect_fetch(connectionObject *self) -{ - PGresult *pgres; - int is_busy; - int next_status; - int ret; - - Dprintf("conn_poll_connect_fetch: status %d", self->status); - - /* consume the input */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_connect_fetch: connection busy, returning %d", - PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->lock)); - - /* connection no longer busy, process input */ - pgres = PQgetResult(self->pgconn); - - /* do the rest while holding the GIL, we won't be calling into any - blocking API */ - pthread_mutex_unlock(&(self->lock)); - Py_END_ALLOW_THREADS; - - Dprintf("conn_poll_connect_fetch: got result %p", pgres); - - /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */ - if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK && - PQresultStatus(pgres) != PGRES_TUPLES_OK)) { - PyErr_SetString(OperationalError, "can't issue " - "initial connection queries"); - IFCLEARPGRES(pgres); - return NULL; - } - - if (self->status == CONN_STATUS_GET_DATESTYLE) { - /* got the result from SET DATESTYLE*/ - Dprintf("conn_poll_connect_fetch: datestyle set"); - next_status = CONN_STATUS_SEND_CLIENT_ENCODING; - } - else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) { - /* got the client_encoding */ - self->encoding = conn_get_encoding(pgres); - if (self->encoding == NULL) { - return NULL; - } - Dprintf("conn_poll_connect_fetch: got client_encoding %s", self->encoding); - - /* since this is the last step, set the other instance variables now */ - self->equote = conn_get_standard_conforming_strings(self->pgconn); - self->protocol = conn_get_protocol_version(self->pgconn); - self->server_version = conn_get_server_version(self->pgconn); - /* - * asynchronous connections always use isolation level 0, the user is - * expected to manage the transactions himself, by sending - * (asynchronously) BEGIN and COMMIT statements. - */ - self->isolation_level = 0; - - /* FIXME: this is a bug: the above queries were sent to the server - with a blocking connection */ - if (pq_set_non_blocking(self, 1, 1) != 0) { - return NULL; - } - - /* next status is going to READY */ - next_status = CONN_STATUS_READY; - } - else { - /* unexpected state, error out */ - PyErr_Format(OperationalError, - "unexpected state: %d", self->status); - return NULL; + return _conn_sync_connect(self); } - - /* clear any leftover result, there should be none, but the protocol - requires calling PQgetResult until you get a NULL */ - pq_clear_async(self); - - self->status = next_status; - - /* if the curent status is READY it means we got the result of the - last initialization query, so we return POLL_OK, otherwise we need to - send another query, so return POLL_WRITE */ - ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE; - Dprintf("conn_poll_connect_fetch: next status is %d, returning %d", - self->status, ret); - return PyInt_FromLong(ret); } -/* conn_poll_ready - handle connection polling when it is already open */ - -PyObject * -conn_poll_ready(connectionObject *self) -{ - int is_busy; - - /* if there is an asynchronous query underway, poll it */ - if (self->async_cursor != NULL) { - if (self->async_status == ASYNC_WRITE) { - return conn_poll_send(self); - } - else { - /* this gets called both for ASYNC_READ and ASYNC_DONE, because - even if the async query is complete, we still might want to - check for NOTIFYs */ - return conn_poll_fetch(self); - } - } - - /* otherwise just check for NOTIFYs */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - /* connection is idle */ - Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK); - return PyInt_FromLong(PSYCO_POLL_OK); - } -} - -/* conn_poll_send - poll the connection when flushing data to the backend */ - -PyObject * -conn_poll_send(connectionObject *self) -{ - int res; - - /* flush queued output to the server */ - res = pq_flush(self); - - if (res == 1) { - /* some data still waiting to be flushed */ - Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE); - return PyInt_FromLong(PSYCO_POLL_WRITE); - } - else if (res == 0) { - /* all data flushed, start waiting for results */ - Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ); - self->async_status = ASYNC_READ; - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - /* unexpected result */ - PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); - return NULL; - } -} - -/* conn_poll_fetch - poll the connection when reading results from the backend - * - * If self_curs is available, use it to store the result of the last query. - * Also unlink it when finished. - */ - -PyObject * -conn_poll_fetch(connectionObject *self) -{ - int is_busy; - int last_result; - - /* consume the input */ - is_busy = pq_is_busy(self); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - /* try to fetch the data only if this was a poll following a read - request; else just return POLL_OK to the user: this is necessary - because of asynchronous NOTIFYs that can be sent by the backend - even if the user didn't asked for them */ - - if (self->async_status == ASYNC_READ && self->async_cursor) { - cursorObject *curs = (cursorObject *)self->async_cursor; - IFCLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); - - /* fetch the tuples (if there are any) and build the result. We don't - * care if pq_fetch return 0 or 1, but if there was an error, we want to - * signal it to the caller. */ - last_result = pq_fetch(curs) == -1 ? -1 : 0; - - /* We have finished with our async_cursor */ - Py_XDECREF(self->async_cursor); - self->async_cursor = NULL; - } - else { - last_result = 0; - } - - if (last_result == 0) { - Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK); - /* self->async_status cannot be ASYNC_WRITE here, because we - never execute curs_poll_fetch in ASYNC_WRITE state, so we can - safely set it to ASYNC_DONE because we either fetched the result or - there is no result to fetch */ - self->async_status = ASYNC_DONE; - return PyInt_FromLong(PSYCO_POLL_OK); - } - else if (last_result == 1) { - Dprintf("conn_poll_fetch: got result, but data remaining, " - "returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - return NULL; - } -} /* poll during a connection attempt until the connection has established. */ -int +static int _conn_poll_connecting(connectionObject *self) { int res = PSYCO_POLL_ERROR; + Dprintf("conn_poll: poll connecting"); switch (PQconnectPoll(self->pgconn)) { case PGRES_POLLING_OK: res = PSYCO_POLL_OK; @@ -792,11 +478,13 @@ _conn_poll_connecting(connectionObject *self) /* Advance to the next state after an attempt of flushing output */ -int + +static int _conn_poll_advance_write(connectionObject *self, int flush) { int res; + Dprintf("conn_poll: poll writing"); switch (flush) { case 0: /* success */ /* we've finished pushing the query to the server. Let's start @@ -821,11 +509,12 @@ _conn_poll_advance_write(connectionObject *self, int flush) } /* Advance to the next state after a call to a pq_is_busy* function */ -int +static int _conn_poll_advance_read(connectionObject *self, int busy) { int res; + Dprintf("conn_poll: poll reading"); switch (busy) { case 0: /* result is ready */ res = PSYCO_POLL_OK; @@ -850,7 +539,8 @@ _conn_poll_advance_read(connectionObject *self, int busy) Advance the async_status (usually going WRITE -> READ -> DONE) but don't mess with the connection status. */ -int + +static int _conn_poll_query(connectionObject *self) { int res = PSYCO_POLL_ERROR; @@ -891,30 +581,148 @@ _conn_poll_query(connectionObject *self) return res; } +/* Advance to the next state during an async connection setup + * + * If the connection is green, this is performed by the regular + * sync code so the queries are sent by conn_setup() while in + * CONN_STATUS_READY state. + */ +static int +_conn_poll_setup_async(connectionObject *self) +{ + int res = PSYCO_POLL_ERROR; + PGresult *pgres; + + switch (self->status) { + case CONN_STATUS_CONNECTING: + /* Set the connection to nonblocking now. */ + if (pq_set_non_blocking(self, 1, 1) != 0) { + break; + } + + self->equote = conn_get_standard_conforming_strings(self->pgconn); + self->protocol = conn_get_protocol_version(self->pgconn); + self->server_version = conn_get_server_version(self->pgconn); -/* conn_poll_green - poll a *sync* connection with external wait */ + /* asynchronous connections always use isolation level 0, the user is + * expected to manage the transactions himself, by sending + * (asynchronously) BEGIN and COMMIT statements. + */ + self->isolation_level = 0; -PyObject * -conn_poll_green(connectionObject *self) + Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); + self->status = CONN_STATUS_DATESTYLE; + if (0 == pq_send_query(self, psyco_datestyle)) { + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + break; + } + Dprintf("conn_poll: async_status -> ASYNC_WRITE"); + self->async_status = ASYNC_WRITE; + res = PSYCO_POLL_WRITE; + break; + + case CONN_STATUS_DATESTYLE: + res = _conn_poll_query(self); + if (res == PSYCO_POLL_OK) { + res = PSYCO_POLL_ERROR; + pgres = pq_get_last_result(self); + if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + PyErr_SetString(OperationalError, "can't set datestyle to ISO"); + break; + } + CLEARPGRES(pgres); + + Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING"); + self->status = CONN_STATUS_CLIENT_ENCODING; + if (0 == pq_send_query(self, psyco_client_encoding)) { + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + break; + } + Dprintf("conn_poll: async_status -> ASYNC_WRITE"); + self->async_status = ASYNC_WRITE; + res = PSYCO_POLL_WRITE; + } + break; + + case CONN_STATUS_CLIENT_ENCODING: + res = _conn_poll_query(self); + if (res == PSYCO_POLL_OK) { + res = PSYCO_POLL_ERROR; + pgres = pq_get_last_result(self); + if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) { + PyErr_SetString(OperationalError, "can't fetch client_encoding"); + break; + } + + /* conn_get_encoding returns a malloc'd string */ + self->encoding = conn_get_encoding(pgres); + CLEARPGRES(pgres); + if (self->encoding == NULL) { break; } + + Dprintf("conn_poll: status -> CONN_STATUS_READY"); + self->status = CONN_STATUS_READY; + res = PSYCO_POLL_OK; + } + break; + } + + return res; +} + + +/* conn_poll - Main polling switch + * + * The function is called in all the states and connection types and invokes + * the right "next step". + */ + +int +conn_poll(connectionObject *self) { int res = PSYCO_POLL_ERROR; + Dprintf("conn_poll: status = %d", self->status); switch (self->status) { case CONN_STATUS_SETUP: - Dprintf("conn_poll: status = CONN_STATUS_SETUP"); + Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING"); self->status = CONN_STATUS_CONNECTING; - res = PSYCO_POLL_WRITE; + res = PSYCO_POLL_WRITE; break; case CONN_STATUS_CONNECTING: - Dprintf("conn_poll: status = CONN_STATUS_CONNECTING"); res = _conn_poll_connecting(self); + if (res == PSYCO_POLL_OK && self->async) { + res = _conn_poll_setup_async(self); + } + break; + + case CONN_STATUS_DATESTYLE: + case CONN_STATUS_CLIENT_ENCODING: + res = _conn_poll_setup_async(self); break; case CONN_STATUS_READY: case CONN_STATUS_BEGIN: - Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN"); res = _conn_poll_query(self); + + if (res == PSYCO_POLL_OK && self->async_cursor) { + /* An async query has just finished: parse the tuple in the + * target cursor. */ + cursorObject *curs = (cursorObject *)self->async_cursor; + IFCLEARPGRES(curs->pgres); + curs->pgres = pq_get_last_result(self); + + /* fetch the tuples (if there are any) and build the result. We + * don't care if pq_fetch return 0 or 1, but if there was an error, + * we want to signal it to the caller. */ + if (pq_fetch(curs) == -1) { + res = PSYCO_POLL_ERROR; + } + + /* We have finished with our async_cursor */ + Py_XDECREF(self->async_cursor); + self->async_cursor = NULL; + } break; default: @@ -922,12 +730,7 @@ conn_poll_green(connectionObject *self) res = PSYCO_POLL_ERROR; } - if (!(res == PSYCO_POLL_ERROR && PyErr_Occurred())) { - return PyInt_FromLong(res); - } else { - /* There is an error and an exception is already in place */ - return NULL; - } + return res; } /* conn_close - do anything needed to shut down the connection */ |
