diff options
-rw-r--r-- | psycopg/connection.h | 5 | ||||
-rw-r--r-- | psycopg/cursor.h | 2 | ||||
-rw-r--r-- | psycopg/cursor_int.c | 53 | ||||
-rw-r--r-- | psycopg/cursor_type.c | 54 | ||||
-rw-r--r-- | psycopg/pqpath.c | 12 |
5 files changed, 91 insertions, 35 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h index d3f4048..33ad30b 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -52,6 +52,10 @@ extern "C" { #define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12 #define CONN_STATUS_GET_TRANSACTION_ISOLATION 13 +/* async query execution status */ +#define ASYNC_READ 1 +#define ASYNC_WRITE 2 + /* polling result, try to keep in sync with PostgresPollingStatusType from libpq-fe.h */ #define PSYCO_POLL_READ 1 @@ -97,6 +101,7 @@ typedef struct { PGconn *pgconn; /* the postgresql connection */ PyObject *async_cursor; /* a cursor executing an asynchronous query */ + int async_status; /* asynchronous execution status */ /* notice processing */ PyObject *notice_list; diff --git a/psycopg/cursor.h b/psycopg/cursor.h index a422283..bcfc81f 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -87,6 +87,8 @@ typedef struct { /* C-callable functions in cursor_int.c and cursor_ext.c */ HIDDEN void curs_reset(cursorObject *self); HIDDEN void curs_get_last_result(cursorObject *self); +HIDDEN PyObject *curs_poll_send(cursorObject *self); +HIDDEN PyObject *curs_poll_fetch(cursorObject *self); /* exception-raising macros */ #define EXC_IF_CURS_CLOSED(self) \ diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c index ae28af1..a4acba1 100644 --- a/psycopg/cursor_int.c +++ b/psycopg/cursor_int.c @@ -79,3 +79,56 @@ curs_get_last_result(cursorObject *self) { Py_END_ALLOW_THREADS; self->needsfetch = 1; } + +/* curs_poll_send - handle cursor polling when flushing output */ + +PyObject * +curs_poll_send(cursorObject *self) +{ + int res; + + /* flush queued output to the server */ + res = pq_flush(self->conn); + + if (res == 1) { + /* some data still waiting to be flushed */ + Dprintf("cur_poll_send: returning %d", PSYCO_POLL_WRITE); + return PyInt_FromLong(PSYCO_POLL_WRITE); + } + else if (res == 0) { + /* all data flushed, start waiting for results */ + Dprintf("cur_poll_send: returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + else { + /* unexpected result */ + PyErr_SetString(OperationalError, PQerrorMessage(self->conn->pgconn)); + return NULL; + } +} + +/* curs_poll_fetch - handle cursor polling when reading result */ + +PyObject * +curs_poll_fetch(cursorObject *self) +{ + int is_busy; + + /* consume the input */ + is_busy = pq_is_busy(self->conn); + if (is_busy == -1) { + /* there was an error, raise the exception */ + return NULL; + } + else if (is_busy == 1) { + /* the connection is busy, tell the user to wait more */ + Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + + /* all data has arrived */ + curs_get_last_result(self); + + Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_OK); + return PyInt_FromLong(PSYCO_POLL_OK); +} diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 1fc88f9..8e264b3 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1497,18 +1497,14 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) "fileno() -> int -- Return file descriptor associated to database connection." static PyObject * -psyco_curs_fileno(cursorObject *self, PyObject *args) +psyco_curs_fileno(cursorObject *self) { long int socket; - if (!PyArg_ParseTuple(args, "")) return NULL; EXC_IF_CURS_CLOSED(self); - /* note how we call PQflush() to make sure the user will use - select() in the safe way! */ Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); - PQflush(self->conn->pgconn); socket = (long int)PQsocket(self->conn->pgconn); pthread_mutex_unlock(&(self->conn->lock)); Py_END_ALLOW_THREADS; @@ -1516,43 +1512,31 @@ psyco_curs_fileno(cursorObject *self, PyObject *args) return PyInt_FromLong(socket); } -/* extension: isready - return true if data from async execute is ready */ +/* extension: poll - return true if data from async execute is ready */ -#define psyco_curs_isready_doc \ -"isready() -> bool -- Return True if data is ready after an async query." +#define psyco_curs_poll_doc \ +"poll() -- return POLL_OK if the query has been fully processed, " \ + "POLL_READ if the query has been sent and the application should be " \ + "waiting for the result to arrive or POLL_WRITE is the query is still " \ + "being sent." static PyObject * -psyco_curs_isready(cursorObject *self, PyObject *args) +psyco_curs_poll(cursorObject *self) { - int res; - - if (!PyArg_ParseTuple(args, "")) return NULL; EXC_IF_CURS_CLOSED(self); - /* pq_is_busy does its own locking, we don't need anything special but if - the cursor is ready we need to fetch the result and free the connection - for the next query. if -1 is returned we raise an exception. */ + Dprintf("curs_poll: polling with status %d", self->conn->async_status); - res = pq_is_busy(self->conn); - - if (res == 1) { - Py_INCREF(Py_False); - return Py_False; + if (self->conn->async_status == ASYNC_WRITE) { + return curs_poll_send(self); } - else if (res == -1) { - return NULL; + else if (self->conn->async_status == ASYNC_READ) { + return curs_poll_fetch(self); } else { - IFCLEARPGRES(self->pgres); - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->conn->lock)); - self->pgres = PQgetResult(self->conn->pgconn); - self->conn->async_cursor = NULL; - pthread_mutex_unlock(&(self->conn->lock)); - Py_END_ALLOW_THREADS; - self->needsfetch = 1; - Py_INCREF(Py_True); - return Py_True; + PyErr_Format(OperationalError, "unexpected execution status: %d", + self->conn->async_status); + return NULL; } } @@ -1634,10 +1618,10 @@ static struct PyMethodDef cursorObject_methods[] = { #ifdef PSYCOPG_EXTENSIONS {"mogrify", (PyCFunction)psyco_curs_mogrify, METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc}, + {"poll", (PyCFunction)psyco_curs_poll, + METH_VARARGS, psyco_curs_poll_doc}, {"fileno", (PyCFunction)psyco_curs_fileno, - METH_VARARGS, psyco_curs_fileno_doc}, - {"isready", (PyCFunction)psyco_curs_isready, - METH_VARARGS, psyco_curs_isready_doc}, + METH_NOARGS, psyco_curs_fileno_doc}, {"copy_from", (PyCFunction)psyco_curs_copy_from, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc}, {"copy_to", (PyCFunction)psyco_curs_copy_to, diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 3f4ad4e..14710a6 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -661,6 +661,7 @@ pq_execute(cursorObject *curs, const char *query, int async) { PGresult *pgres = NULL; char *error = NULL; + int async_status = ASYNC_WRITE; /* if the status of the connection is critical raise an exception and definitely close the connection */ @@ -720,6 +721,16 @@ pq_execute(cursorObject *curs, const char *query, int async) return -1; } Dprintf("pq_execute: async query sent to backend"); + + if (PQflush(curs->conn->pgconn) == 0) { + /* the query got fully sent to the server */ + Dprintf("pq_execute: query got flushed immediately"); + /* the async status will be ASYNC_READ */ + async_status = ASYNC_READ; + } + else { + async_status = ASYNC_WRITE; + } } pthread_mutex_unlock(&(curs->conn->lock)); @@ -734,6 +745,7 @@ pq_execute(cursorObject *curs, const char *query, int async) if (pq_fetch(curs) == -1) return -1; } else { + curs->conn->async_status = async_status; curs->conn->async_cursor = (PyObject*)curs; } |