summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--psycopg/connection.h1
-rw-r--r--psycopg/connection_int.c38
-rw-r--r--psycopg/connection_type.c7
-rw-r--r--psycopg/cursor_type.c57
-rwxr-xr-xtests/test_async.py42
5 files changed, 74 insertions, 71 deletions
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 94d756d..6b74ca7 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -132,6 +132,7 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN PyObject *conn_poll_send(connectionObject *self);
HIDDEN PyObject *conn_poll_fetch(connectionObject *self);
+HIDDEN PyObject *conn_poll_ready(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index a41d754..81eae8a 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -564,6 +564,44 @@ conn_poll_fetch(connectionObject *self)
return PyInt_FromLong(ret);
}
+/* conn_poll_ready - handle connection polling when it is already open */
+
+PyObject *
+conn_poll_ready(connectionObject *self)
+{
+ int is_busy;
+
+ /* if there is an asynchronous query underway, poll it */
+ if (self->async_cursor != NULL) {
+ if (self->async_status == ASYNC_WRITE) {
+ return curs_poll_send((cursorObject *) self->async_cursor);
+ }
+ else {
+ /* this gets called both for ASYNC_READ and ASYNC_DONE, because
+ even if the async query is complete, we still might want to
+ check for NOTIFYs */
+ return curs_poll_fetch((cursorObject *) self->async_cursor);
+ }
+ }
+
+ /* otherwise just check for NOTIFYs */
+ is_busy = pq_is_busy(self);
+ if (is_busy == -1) {
+ /* there was an error, raise the exception */
+ return NULL;
+ }
+ else if (is_busy == 1) {
+ /* the connection is busy, tell the user to wait more */
+ Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ);
+ return PyInt_FromLong(PSYCO_POLL_READ);
+ }
+ else {
+ /* connection is idle */
+ Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK);
+ return PyInt_FromLong(PSYCO_POLL_OK);
+ }
+}
+
/* conn_close - do anything needed to shut down the connection */
void
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index f7fe190..432078f 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -449,8 +449,11 @@ psyco_conn_poll(connectionObject *self)
break;
case CONN_STATUS_READY:
- /* we have completed the connection setup */
- return PyInt_FromLong(PSYCO_POLL_OK);
+ case CONN_STATUS_BEGIN:
+ /* The connection is ready, but we might be in an asynchronous query,
+ or we just might want to check for NOTIFYs. For synchronous
+ connections the status might be BEGIN, not READY. */
+ return conn_poll_ready(self);
default:
/* everything else is an error */
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index b916477..2d4751e 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -1441,59 +1441,6 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
return res;
}
-/* extension: fileno - return the file descripor of the connection */
-
-#define psyco_curs_fileno_doc \
-"fileno() -> int -- Return file descriptor associated to database connection."
-
-static PyObject *
-psyco_curs_fileno(cursorObject *self)
-{
- long int socket;
-
- EXC_IF_CURS_CLOSED(self);
-
- Py_BEGIN_ALLOW_THREADS;
- pthread_mutex_lock(&(self->conn->lock));
- socket = (long int)PQsocket(self->conn->pgconn);
- pthread_mutex_unlock(&(self->conn->lock));
- Py_END_ALLOW_THREADS;
-
- return PyInt_FromLong(socket);
-}
-
-/* extension: poll - return true if data from async execute is ready */
-
-#define psyco_curs_poll_doc \
-"poll() -- return POLL_OK if the query has been fully processed, " \
- "POLL_READ if the query has been sent and the application should be " \
- "waiting for the result to arrive or POLL_WRITE is the query is still " \
- "being sent."
-
-static PyObject *
-psyco_curs_poll(cursorObject *self)
-{
- EXC_IF_CURS_CLOSED(self);
-
- if (self->conn->async_cursor != NULL &&
- self->conn->async_cursor != (PyObject *) self) {
- PyErr_SetString(ProgrammingError, "poll with wrong cursor");
- return NULL;
- }
-
- Dprintf("curs_poll: polling with status %d", self->conn->async_status);
-
- if (self->conn->async_status == ASYNC_WRITE) {
- return curs_poll_send(self);
- }
- else {
- /* this gets called both for ASYNC_READ and ASYNC_DONE, because even
- if the async query is complete, we still might want to check for
- NOTIFYs */
- return curs_poll_fetch(self);
- }
-}
-
/* extension: closed - return true if cursor is closed*/
#define psyco_curs_closed_doc \
@@ -1572,10 +1519,6 @@ static struct PyMethodDef cursorObject_methods[] = {
#ifdef PSYCOPG_EXTENSIONS
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
- {"poll", (PyCFunction)psyco_curs_poll,
- METH_VARARGS, psyco_curs_poll_doc},
- {"fileno", (PyCFunction)psyco_curs_fileno,
- METH_NOARGS, psyco_curs_fileno_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
{"copy_to", (PyCFunction)psyco_curs_copy_to,
diff --git a/tests/test_async.py b/tests/test_async.py
index 91a5392..e1b50cb 100755
--- a/tests/test_async.py
+++ b/tests/test_async.py
@@ -4,6 +4,7 @@ import unittest
import psycopg2
from psycopg2 import extensions
+import time
import select
import StringIO
@@ -48,7 +49,10 @@ class AsyncTests(unittest.TestCase):
self.sync_conn.close()
self.conn.close()
- def wait(self, pollable):
+ def wait(self, cur_or_conn):
+ pollable = cur_or_conn
+ if not hasattr(pollable, 'poll'):
+ pollable = cur_or_conn.connection
while True:
state = pollable.poll()
if state == psycopg2.extensions.POLL_OK:
@@ -301,7 +305,7 @@ class AsyncTests(unittest.TestCase):
curs = self.conn.cursor()
for mb in 1, 5, 10, 20, 50:
size = mb * 1024 * 1024
- stub = PollableStub(curs)
+ stub = PollableStub(self.conn)
curs.execute("select %s;", ('x' * size,))
self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0]))
@@ -312,19 +316,33 @@ class AsyncTests(unittest.TestCase):
def test_sync_poll(self):
cur = self.sync_conn.cursor()
- # polling a sync cursor works
- cur.poll()
+ cur.execute("select 1")
+ # polling with a sync query works
+ cur.connection.poll()
+ self.assertEquals(cur.fetchone()[0], 1)
- def test_async_poll_wrong_cursor(self):
- cur1 = self.conn.cursor()
- cur2 = self.conn.cursor()
- cur1.execute("select 1")
+ def test_notify(self):
+ cur = self.conn.cursor()
+ sync_cur = self.sync_conn.cursor()
- # polling a cursor that's not currently executing is an error
- self.assertRaises(psycopg2.ProgrammingError, cur2.poll)
+ sync_cur.execute("listen test_notify")
+ self.sync_conn.commit()
+ cur.execute("notify test_notify")
+ self.wait(cur)
- self.wait(cur1)
- self.assertEquals(cur1.fetchone()[0], 1)
+ self.assertEquals(self.sync_conn.notifies, [])
+
+ pid = self.conn.get_backend_pid()
+ for _ in range(5):
+ self.wait(self.sync_conn)
+ if not self.sync_conn.notifies:
+ time.sleep(0.5)
+ continue
+ self.assertEquals(len(self.sync_conn.notifies), 1)
+ self.assertEquals(self.sync_conn.notifies.pop(),
+ (pid, "test_notify"))
+ return
+ self.fail("No NOTIFY in 2.5 seconds")
def test_async_fetch_wrong_cursor(self):
cur1 = self.conn.cursor()