summaryrefslogtreecommitdiff
path: root/psycopg/connection_int.c
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-04-22 19:59:00 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-04-23 13:28:49 +0100
commitcb40342afa383176684c805789782403ee49a0a7 (patch)
tree1009d985c53a801958d3a55d760edbfd6e15d87a /psycopg/connection_int.c
parent0da4befe786679c557ef4ae52db486d704fc818d (diff)
downloadpsycopg2-cb40342afa383176684c805789782403ee49a0a7.tar.gz
poll implementation for async, sync and green connection unified.
Diffstat (limited to 'psycopg/connection_int.c')
-rw-r--r--psycopg/connection_int.c503
1 files changed, 153 insertions, 350 deletions
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index f398238..b878749 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -208,7 +208,6 @@ conn_get_encoding(PGresult *pgres)
for (i=0 ; i < strlen(tmp) ; i++)
encoding[i] = toupper(tmp[i]);
encoding[i] = '\0';
- CLEARPGRES(pgres);
return encoding;
}
@@ -258,10 +257,18 @@ conn_setup(connectionObject *self, PGconn *pgconn)
{
PGresult *pgres;
+ self->equote = conn_get_standard_conforming_strings(pgconn);
+ self->server_version = conn_get_server_version(pgconn);
+ self->protocol = conn_get_protocol_version(self->pgconn);
+
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
+ if (pq_set_non_blocking(self, 1, 1) != 0) {
+ return -1;
+ }
+
if (!psyco_green()) {
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_datestyle);
@@ -305,6 +312,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
+ CLEARPGRES(pgres);
if (!psyco_green()) {
Py_UNBLOCK_THREADS;
@@ -334,8 +342,8 @@ conn_setup(connectionObject *self, PGconn *pgconn)
/* conn_connect - execute a connection to the database */
-int
-conn_sync_connect(connectionObject *self)
+static int
+_conn_sync_connect(connectionObject *self)
{
PGconn *pgconn;
PyObject *wait_rv;
@@ -382,19 +390,11 @@ conn_sync_connect(connectionObject *self)
}
}
- self->equote = conn_get_standard_conforming_strings(pgconn);
- self->server_version = conn_get_server_version(pgconn);
- self->protocol = conn_get_protocol_version(self->pgconn);
-
/* From here the connection is considered ready: with the new status,
* poll() will use PQisBusy instead of PQconnectPoll.
*/
self->status = CONN_STATUS_READY;
- if (pq_set_non_blocking(self, 1, 1) != 0) {
- return -1;
- }
-
if (conn_setup(self, self->pgconn) == -1) {
return -1;
}
@@ -403,7 +403,7 @@ conn_sync_connect(connectionObject *self)
}
static int
-conn_async_connect(connectionObject *self)
+_conn_async_connect(connectionObject *self)
{
PGconn *pgconn;
@@ -426,7 +426,10 @@ conn_async_connect(connectionObject *self)
PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
- self->status = CONN_STATUS_SETUP;
+ /* The connection will be completed banging on poll():
+ * First with _conn_poll_connecting() that will finish connection,
+ * then with _conn_poll_setup_async() that will do the same job
+ * of setup_async(). */
return 0;
}
@@ -436,340 +439,23 @@ conn_connect(connectionObject *self, long int async)
{
if (async == 1) {
Dprintf("con_connect: connecting in ASYNC mode");
- return conn_async_connect(self);
+ return _conn_async_connect(self);
}
else {
Dprintf("con_connect: connecting in SYNC mode");
- return conn_sync_connect(self);
- }
-}
-
-/* conn_poll_connect_send - handle connection polling when flushing output
- during asynchronous connection attempt. */
-
-PyObject *
-conn_poll_connect_send(connectionObject *self)
-{
- const char *query = NULL;
- int next_status;
- int ret;
-
- Dprintf("conn_poll_connect_send: status %d", self->status);
-
- switch (self->status) {
- case CONN_STATUS_SEND_DATESTYLE:
- /* set the datestyle */
- query = psyco_datestyle;
- next_status = CONN_STATUS_SENT_DATESTYLE;
- break;
- case CONN_STATUS_SEND_CLIENT_ENCODING:
- /* get the client_encoding */
- query = psyco_client_encoding;
- next_status = CONN_STATUS_SENT_CLIENT_ENCODING;
- break;
- case CONN_STATUS_SENT_DATESTYLE:
- case CONN_STATUS_SENT_CLIENT_ENCODING:
- /* the query has only been partially sent */
- query = NULL;
- next_status = self->status;
- break;
- default:
- /* unexpected state, error out */
- PyErr_Format(OperationalError,
- "unexpected state: %d", self->status);
- return NULL;
- }
-
- Dprintf("conn_poll_connect_send: sending query %-.200s", query);
-
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(self->lock));
-
- if (query != NULL) {
- if (PQsendQuery(self->pgconn, query) != 1) {
- pthread_mutex_unlock(&(self->lock));
- Py_BLOCK_THREADS;
- PyErr_SetString(OperationalError,
- PQerrorMessage(self->pgconn));
- return NULL;
- }
- }
-
- if (PQflush(self->pgconn) == 0) {
- /* the query got fully sent to the server */
- Dprintf("conn_poll_connect_send: query got flushed immediately");
- /* the return value will be POLL_READ */
- ret = PSYCO_POLL_READ;
-
- /* advance the next status, since we skip over the "waiting for the
- query to be sent" status */
- switch (next_status) {
- case CONN_STATUS_SENT_DATESTYLE:
- next_status = CONN_STATUS_GET_DATESTYLE;
- break;
- case CONN_STATUS_SENT_CLIENT_ENCODING:
- next_status = CONN_STATUS_GET_CLIENT_ENCODING;
- break;
- }
- }
- else {
- /* query did not get sent completely, tell the client to wait for the
- socket to become writable */
- ret = PSYCO_POLL_WRITE;
- }
-
- self->status = next_status;
- Dprintf("conn_poll_connect_send: next status is %d, returning %d",
- self->status, ret);
-
- pthread_mutex_unlock(&(self->lock));
- Py_END_ALLOW_THREADS;
-
- return PyInt_FromLong(ret);
-}
-
-/* conn_poll_connect_fetch - handle connection polling when reading result
- during asynchronous connection attempt. */
-
-PyObject *
-conn_poll_connect_fetch(connectionObject *self)
-{
- PGresult *pgres;
- int is_busy;
- int next_status;
- int ret;
-
- Dprintf("conn_poll_connect_fetch: status %d", self->status);
-
- /* consume the input */
- is_busy = pq_is_busy(self);
- if (is_busy == -1) {
- /* there was an error, raise the exception */
- return NULL;
- }
- else if (is_busy == 1) {
- /* the connection is busy, tell the user to wait more */
- Dprintf("conn_poll_connect_fetch: connection busy, returning %d",
- PSYCO_POLL_READ);
- return PyInt_FromLong(PSYCO_POLL_READ);
- }
-
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(self->lock));
-
- /* connection no longer busy, process input */
- pgres = PQgetResult(self->pgconn);
-
- /* do the rest while holding the GIL, we won't be calling into any
- blocking API */
- pthread_mutex_unlock(&(self->lock));
- Py_END_ALLOW_THREADS;
-
- Dprintf("conn_poll_connect_fetch: got result %p", pgres);
-
- /* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */
- if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK &&
- PQresultStatus(pgres) != PGRES_TUPLES_OK)) {
- PyErr_SetString(OperationalError, "can't issue "
- "initial connection queries");
- IFCLEARPGRES(pgres);
- return NULL;
- }
-
- if (self->status == CONN_STATUS_GET_DATESTYLE) {
- /* got the result from SET DATESTYLE*/
- Dprintf("conn_poll_connect_fetch: datestyle set");
- next_status = CONN_STATUS_SEND_CLIENT_ENCODING;
- }
- else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) {
- /* got the client_encoding */
- self->encoding = conn_get_encoding(pgres);
- if (self->encoding == NULL) {
- return NULL;
- }
- Dprintf("conn_poll_connect_fetch: got client_encoding %s", self->encoding);
-
- /* since this is the last step, set the other instance variables now */
- self->equote = conn_get_standard_conforming_strings(self->pgconn);
- self->protocol = conn_get_protocol_version(self->pgconn);
- self->server_version = conn_get_server_version(self->pgconn);
- /*
- * asynchronous connections always use isolation level 0, the user is
- * expected to manage the transactions himself, by sending
- * (asynchronously) BEGIN and COMMIT statements.
- */
- self->isolation_level = 0;
-
- /* FIXME: this is a bug: the above queries were sent to the server
- with a blocking connection */
- if (pq_set_non_blocking(self, 1, 1) != 0) {
- return NULL;
- }
-
- /* next status is going to READY */
- next_status = CONN_STATUS_READY;
- }
- else {
- /* unexpected state, error out */
- PyErr_Format(OperationalError,
- "unexpected state: %d", self->status);
- return NULL;
+ return _conn_sync_connect(self);
}
-
- /* clear any leftover result, there should be none, but the protocol
- requires calling PQgetResult until you get a NULL */
- pq_clear_async(self);
-
- self->status = next_status;
-
- /* if the curent status is READY it means we got the result of the
- last initialization query, so we return POLL_OK, otherwise we need to
- send another query, so return POLL_WRITE */
- ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE;
- Dprintf("conn_poll_connect_fetch: next status is %d, returning %d",
- self->status, ret);
- return PyInt_FromLong(ret);
}
-/* conn_poll_ready - handle connection polling when it is already open */
-
-PyObject *
-conn_poll_ready(connectionObject *self)
-{
- int is_busy;
-
- /* if there is an asynchronous query underway, poll it */
- if (self->async_cursor != NULL) {
- if (self->async_status == ASYNC_WRITE) {
- return conn_poll_send(self);
- }
- else {
- /* this gets called both for ASYNC_READ and ASYNC_DONE, because
- even if the async query is complete, we still might want to
- check for NOTIFYs */
- return conn_poll_fetch(self);
- }
- }
-
- /* otherwise just check for NOTIFYs */
- is_busy = pq_is_busy(self);
- if (is_busy == -1) {
- /* there was an error, raise the exception */
- return NULL;
- }
- else if (is_busy == 1) {
- /* the connection is busy, tell the user to wait more */
- Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ);
- return PyInt_FromLong(PSYCO_POLL_READ);
- }
- else {
- /* connection is idle */
- Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK);
- return PyInt_FromLong(PSYCO_POLL_OK);
- }
-}
-
-/* conn_poll_send - poll the connection when flushing data to the backend */
-
-PyObject *
-conn_poll_send(connectionObject *self)
-{
- int res;
-
- /* flush queued output to the server */
- res = pq_flush(self);
-
- if (res == 1) {
- /* some data still waiting to be flushed */
- Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE);
- return PyInt_FromLong(PSYCO_POLL_WRITE);
- }
- else if (res == 0) {
- /* all data flushed, start waiting for results */
- Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ);
- self->async_status = ASYNC_READ;
- return PyInt_FromLong(PSYCO_POLL_READ);
- }
- else {
- /* unexpected result */
- PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
- return NULL;
- }
-}
-
-/* conn_poll_fetch - poll the connection when reading results from the backend
- *
- * If self_curs is available, use it to store the result of the last query.
- * Also unlink it when finished.
- */
-
-PyObject *
-conn_poll_fetch(connectionObject *self)
-{
- int is_busy;
- int last_result;
-
- /* consume the input */
- is_busy = pq_is_busy(self);
- if (is_busy == -1) {
- /* there was an error, raise the exception */
- return NULL;
- }
- else if (is_busy == 1) {
- /* the connection is busy, tell the user to wait more */
- Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ);
- return PyInt_FromLong(PSYCO_POLL_READ);
- }
-
- /* try to fetch the data only if this was a poll following a read
- request; else just return POLL_OK to the user: this is necessary
- because of asynchronous NOTIFYs that can be sent by the backend
- even if the user didn't asked for them */
-
- if (self->async_status == ASYNC_READ && self->async_cursor) {
- cursorObject *curs = (cursorObject *)self->async_cursor;
- IFCLEARPGRES(curs->pgres);
- curs->pgres = pq_get_last_result(self);
-
- /* fetch the tuples (if there are any) and build the result. We don't
- * care if pq_fetch return 0 or 1, but if there was an error, we want to
- * signal it to the caller. */
- last_result = pq_fetch(curs) == -1 ? -1 : 0;
-
- /* We have finished with our async_cursor */
- Py_XDECREF(self->async_cursor);
- self->async_cursor = NULL;
- }
- else {
- last_result = 0;
- }
-
- if (last_result == 0) {
- Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK);
- /* self->async_status cannot be ASYNC_WRITE here, because we
- never execute curs_poll_fetch in ASYNC_WRITE state, so we can
- safely set it to ASYNC_DONE because we either fetched the result or
- there is no result to fetch */
- self->async_status = ASYNC_DONE;
- return PyInt_FromLong(PSYCO_POLL_OK);
- }
- else if (last_result == 1) {
- Dprintf("conn_poll_fetch: got result, but data remaining, "
- "returning %d", PSYCO_POLL_READ);
- return PyInt_FromLong(PSYCO_POLL_READ);
- }
- else {
- return NULL;
- }
-}
/* poll during a connection attempt until the connection has established. */
-int
+static int
_conn_poll_connecting(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
+ Dprintf("conn_poll: poll connecting");
switch (PQconnectPoll(self->pgconn)) {
case PGRES_POLLING_OK:
res = PSYCO_POLL_OK;
@@ -792,11 +478,13 @@ _conn_poll_connecting(connectionObject *self)
/* Advance to the next state after an attempt of flushing output */
-int
+
+static int
_conn_poll_advance_write(connectionObject *self, int flush)
{
int res;
+ Dprintf("conn_poll: poll writing");
switch (flush) {
case 0: /* success */
/* we've finished pushing the query to the server. Let's start
@@ -821,11 +509,12 @@ _conn_poll_advance_write(connectionObject *self, int flush)
}
/* Advance to the next state after a call to a pq_is_busy* function */
-int
+static int
_conn_poll_advance_read(connectionObject *self, int busy)
{
int res;
+ Dprintf("conn_poll: poll reading");
switch (busy) {
case 0: /* result is ready */
res = PSYCO_POLL_OK;
@@ -850,7 +539,8 @@ _conn_poll_advance_read(connectionObject *self, int busy)
Advance the async_status (usually going WRITE -> READ -> DONE) but don't
mess with the connection status. */
-int
+
+static int
_conn_poll_query(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
@@ -891,30 +581,148 @@ _conn_poll_query(connectionObject *self)
return res;
}
+/* Advance to the next state during an async connection setup
+ *
+ * If the connection is green, this is performed by the regular
+ * sync code so the queries are sent by conn_setup() while in
+ * CONN_STATUS_READY state.
+ */
+static int
+_conn_poll_setup_async(connectionObject *self)
+{
+ int res = PSYCO_POLL_ERROR;
+ PGresult *pgres;
+
+ switch (self->status) {
+ case CONN_STATUS_CONNECTING:
+ /* Set the connection to nonblocking now. */
+ if (pq_set_non_blocking(self, 1, 1) != 0) {
+ break;
+ }
+
+ self->equote = conn_get_standard_conforming_strings(self->pgconn);
+ self->protocol = conn_get_protocol_version(self->pgconn);
+ self->server_version = conn_get_server_version(self->pgconn);
-/* conn_poll_green - poll a *sync* connection with external wait */
+ /* asynchronous connections always use isolation level 0, the user is
+ * expected to manage the transactions himself, by sending
+ * (asynchronously) BEGIN and COMMIT statements.
+ */
+ self->isolation_level = 0;
-PyObject *
-conn_poll_green(connectionObject *self)
+ Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
+ self->status = CONN_STATUS_DATESTYLE;
+ if (0 == pq_send_query(self, psyco_datestyle)) {
+ PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
+ break;
+ }
+ Dprintf("conn_poll: async_status -> ASYNC_WRITE");
+ self->async_status = ASYNC_WRITE;
+ res = PSYCO_POLL_WRITE;
+ break;
+
+ case CONN_STATUS_DATESTYLE:
+ res = _conn_poll_query(self);
+ if (res == PSYCO_POLL_OK) {
+ res = PSYCO_POLL_ERROR;
+ pgres = pq_get_last_result(self);
+ if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
+ PyErr_SetString(OperationalError, "can't set datestyle to ISO");
+ break;
+ }
+ CLEARPGRES(pgres);
+
+ Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING");
+ self->status = CONN_STATUS_CLIENT_ENCODING;
+ if (0 == pq_send_query(self, psyco_client_encoding)) {
+ PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
+ break;
+ }
+ Dprintf("conn_poll: async_status -> ASYNC_WRITE");
+ self->async_status = ASYNC_WRITE;
+ res = PSYCO_POLL_WRITE;
+ }
+ break;
+
+ case CONN_STATUS_CLIENT_ENCODING:
+ res = _conn_poll_query(self);
+ if (res == PSYCO_POLL_OK) {
+ res = PSYCO_POLL_ERROR;
+ pgres = pq_get_last_result(self);
+ if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
+ PyErr_SetString(OperationalError, "can't fetch client_encoding");
+ break;
+ }
+
+ /* conn_get_encoding returns a malloc'd string */
+ self->encoding = conn_get_encoding(pgres);
+ CLEARPGRES(pgres);
+ if (self->encoding == NULL) { break; }
+
+ Dprintf("conn_poll: status -> CONN_STATUS_READY");
+ self->status = CONN_STATUS_READY;
+ res = PSYCO_POLL_OK;
+ }
+ break;
+ }
+
+ return res;
+}
+
+
+/* conn_poll - Main polling switch
+ *
+ * The function is called in all the states and connection types and invokes
+ * the right "next step".
+ */
+
+int
+conn_poll(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
+ Dprintf("conn_poll: status = %d", self->status);
switch (self->status) {
case CONN_STATUS_SETUP:
- Dprintf("conn_poll: status = CONN_STATUS_SETUP");
+ Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
self->status = CONN_STATUS_CONNECTING;
- res = PSYCO_POLL_WRITE;
+ res = PSYCO_POLL_WRITE;
break;
case CONN_STATUS_CONNECTING:
- Dprintf("conn_poll: status = CONN_STATUS_CONNECTING");
res = _conn_poll_connecting(self);
+ if (res == PSYCO_POLL_OK && self->async) {
+ res = _conn_poll_setup_async(self);
+ }
+ break;
+
+ case CONN_STATUS_DATESTYLE:
+ case CONN_STATUS_CLIENT_ENCODING:
+ res = _conn_poll_setup_async(self);
break;
case CONN_STATUS_READY:
case CONN_STATUS_BEGIN:
- Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN");
res = _conn_poll_query(self);
+
+ if (res == PSYCO_POLL_OK && self->async_cursor) {
+ /* An async query has just finished: parse the tuple in the
+ * target cursor. */
+ cursorObject *curs = (cursorObject *)self->async_cursor;
+ IFCLEARPGRES(curs->pgres);
+ curs->pgres = pq_get_last_result(self);
+
+ /* fetch the tuples (if there are any) and build the result. We
+ * don't care if pq_fetch return 0 or 1, but if there was an error,
+ * we want to signal it to the caller. */
+ if (pq_fetch(curs) == -1) {
+ res = PSYCO_POLL_ERROR;
+ }
+
+ /* We have finished with our async_cursor */
+ Py_XDECREF(self->async_cursor);
+ self->async_cursor = NULL;
+ }
break;
default:
@@ -922,12 +730,7 @@ conn_poll_green(connectionObject *self)
res = PSYCO_POLL_ERROR;
}
- if (!(res == PSYCO_POLL_ERROR && PyErr_Occurred())) {
- return PyInt_FromLong(res);
- } else {
- /* There is an error and an exception is already in place */
- return NULL;
- }
+ return res;
}
/* conn_close - do anything needed to shut down the connection */