summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--psycopg/connection.h5
-rw-r--r--psycopg/cursor.h2
-rw-r--r--psycopg/cursor_int.c53
-rw-r--r--psycopg/cursor_type.c54
-rw-r--r--psycopg/pqpath.c12
5 files changed, 91 insertions, 35 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h
index d3f4048..33ad30b 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -52,6 +52,10 @@ extern "C" {
#define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12
#define CONN_STATUS_GET_TRANSACTION_ISOLATION 13
+/* async query execution status */
+#define ASYNC_READ 1
+#define ASYNC_WRITE 2
+
/* polling result, try to keep in sync with PostgresPollingStatusType from
libpq-fe.h */
#define PSYCO_POLL_READ 1
@@ -97,6 +101,7 @@ typedef struct {
PGconn *pgconn; /* the postgresql connection */
PyObject *async_cursor; /* a cursor executing an asynchronous query */
+ int async_status; /* asynchronous execution status */
/* notice processing */
PyObject *notice_list;
diff --git a/psycopg/cursor.h b/psycopg/cursor.h
index a422283..bcfc81f 100644
--- a/psycopg/cursor.h
+++ b/psycopg/cursor.h
@@ -87,6 +87,8 @@ typedef struct {
/* C-callable functions in cursor_int.c and cursor_ext.c */
HIDDEN void curs_reset(cursorObject *self);
HIDDEN void curs_get_last_result(cursorObject *self);
+HIDDEN PyObject *curs_poll_send(cursorObject *self);
+HIDDEN PyObject *curs_poll_fetch(cursorObject *self);
/* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) \
diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c
index ae28af1..a4acba1 100644
--- a/psycopg/cursor_int.c
+++ b/psycopg/cursor_int.c
@@ -79,3 +79,56 @@ curs_get_last_result(cursorObject *self) {
Py_END_ALLOW_THREADS;
self->needsfetch = 1;
}
+
+/* curs_poll_send - handle cursor polling when flushing output */
+
+PyObject *
+curs_poll_send(cursorObject *self)
+{
+ int res;
+
+ /* flush queued output to the server */
+ res = pq_flush(self->conn);
+
+ if (res == 1) {
+ /* some data still waiting to be flushed */
+ Dprintf("cur_poll_send: returning %d", PSYCO_POLL_WRITE);
+ return PyInt_FromLong(PSYCO_POLL_WRITE);
+ }
+ else if (res == 0) {
+ /* all data flushed, start waiting for results */
+ Dprintf("cur_poll_send: returning %d", PSYCO_POLL_READ);
+ return PyInt_FromLong(PSYCO_POLL_READ);
+ }
+ else {
+ /* unexpected result */
+ PyErr_SetString(OperationalError, PQerrorMessage(self->conn->pgconn));
+ return NULL;
+ }
+}
+
+/* curs_poll_fetch - handle cursor polling when reading result */
+
+PyObject *
+curs_poll_fetch(cursorObject *self)
+{
+ int is_busy;
+
+ /* consume the input */
+ is_busy = pq_is_busy(self->conn);
+ if (is_busy == -1) {
+ /* there was an error, raise the exception */
+ return NULL;
+ }
+ else if (is_busy == 1) {
+ /* the connection is busy, tell the user to wait more */
+ Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_READ);
+ return PyInt_FromLong(PSYCO_POLL_READ);
+ }
+
+ /* all data has arrived */
+ curs_get_last_result(self);
+
+ Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_OK);
+ return PyInt_FromLong(PSYCO_POLL_OK);
+}
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index 1fc88f9..8e264b3 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -1497,18 +1497,14 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
"fileno() -> int -- Return file descriptor associated to database connection."
static PyObject *
-psyco_curs_fileno(cursorObject *self, PyObject *args)
+psyco_curs_fileno(cursorObject *self)
{
long int socket;
- if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
- /* note how we call PQflush() to make sure the user will use
- select() in the safe way! */
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock));
- PQflush(self->conn->pgconn);
socket = (long int)PQsocket(self->conn->pgconn);
pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS;
@@ -1516,43 +1512,31 @@ psyco_curs_fileno(cursorObject *self, PyObject *args)
return PyInt_FromLong(socket);
}
-/* extension: isready - return true if data from async execute is ready */
+/* extension: poll - return true if data from async execute is ready */
-#define psyco_curs_isready_doc \
-"isready() -> bool -- Return True if data is ready after an async query."
+#define psyco_curs_poll_doc \
+"poll() -- return POLL_OK if the query has been fully processed, " \
+ "POLL_READ if the query has been sent and the application should be " \
+ "waiting for the result to arrive or POLL_WRITE is the query is still " \
+ "being sent."
static PyObject *
-psyco_curs_isready(cursorObject *self, PyObject *args)
+psyco_curs_poll(cursorObject *self)
{
- int res;
-
- if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
- /* pq_is_busy does its own locking, we don't need anything special but if
- the cursor is ready we need to fetch the result and free the connection
- for the next query. if -1 is returned we raise an exception. */
+ Dprintf("curs_poll: polling with status %d", self->conn->async_status);
- res = pq_is_busy(self->conn);
-
- if (res == 1) {
- Py_INCREF(Py_False);
- return Py_False;
+ if (self->conn->async_status == ASYNC_WRITE) {
+ return curs_poll_send(self);
}
- else if (res == -1) {
- return NULL;
+ else if (self->conn->async_status == ASYNC_READ) {
+ return curs_poll_fetch(self);
}
else {
- IFCLEARPGRES(self->pgres);
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(self->conn->lock));
- self->pgres = PQgetResult(self->conn->pgconn);
- self->conn->async_cursor = NULL;
- pthread_mutex_unlock(&(self->conn->lock));
- Py_END_ALLOW_THREADS;
- self->needsfetch = 1;
- Py_INCREF(Py_True);
- return Py_True;
+ PyErr_Format(OperationalError, "unexpected execution status: %d",
+ self->conn->async_status);
+ return NULL;
}
}
@@ -1634,10 +1618,10 @@ static struct PyMethodDef cursorObject_methods[] = {
#ifdef PSYCOPG_EXTENSIONS
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
+ {"poll", (PyCFunction)psyco_curs_poll,
+ METH_VARARGS, psyco_curs_poll_doc},
{"fileno", (PyCFunction)psyco_curs_fileno,
- METH_VARARGS, psyco_curs_fileno_doc},
- {"isready", (PyCFunction)psyco_curs_isready,
- METH_VARARGS, psyco_curs_isready_doc},
+ METH_NOARGS, psyco_curs_fileno_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
{"copy_to", (PyCFunction)psyco_curs_copy_to,
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index 3f4ad4e..14710a6 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -661,6 +661,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
{
PGresult *pgres = NULL;
char *error = NULL;
+ int async_status = ASYNC_WRITE;
/* if the status of the connection is critical raise an exception and
definitely close the connection */
@@ -720,6 +721,16 @@ pq_execute(cursorObject *curs, const char *query, int async)
return -1;
}
Dprintf("pq_execute: async query sent to backend");
+
+ if (PQflush(curs->conn->pgconn) == 0) {
+ /* the query got fully sent to the server */
+ Dprintf("pq_execute: query got flushed immediately");
+ /* the async status will be ASYNC_READ */
+ async_status = ASYNC_READ;
+ }
+ else {
+ async_status = ASYNC_WRITE;
+ }
}
pthread_mutex_unlock(&(curs->conn->lock));
@@ -734,6 +745,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
if (pq_fetch(curs) == -1) return -1;
}
else {
+ curs->conn->async_status = async_status;
curs->conn->async_cursor = (PyObject*)curs;
}