summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog2
-rw-r--r--doc/src/advanced.rst15
-rw-r--r--psycopg/connection.h1
-rw-r--r--psycopg/connection_int.c27
-rw-r--r--psycopg/pqpath.c25
-rwxr-xr-xtests/__init__.py2
-rwxr-xr-xtests/test_notify.py100
7 files changed, 145 insertions, 27 deletions
diff --git a/ChangeLog b/ChangeLog
index 613a134..12cc6ab 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,8 @@
* lib/pqpath.c: Fixed reference leak in notify reception.
+ * Notifies are collected if available after every query execution.
+
2010-04-13 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* lib/extensions.py: DECIMAL typecaster imported from _psycopg.
diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst
index a601339..b8ed537 100644
--- a/doc/src/advanced.rst
+++ b/doc/src/advanced.rst
@@ -227,11 +227,16 @@ manner.
.. |NOTIFY| replace:: :sql:`NOTIFY`
.. _NOTIFY: http://www.postgresql.org/docs/8.4/static/sql-notify.html
-Notification are received using the `~connection.poll()` method. A simple
-application could poll the connection from time to time to check if something
-new has arrived. A better strategy is to use some I/O completion function such
-as |select()|_ to sleep until awaken from the kernel when there is some data to
-read on the connection, thereby using no CPU unless there is something to read::
+Notification are received after every query execution. If the user is interested
+in receiveing notification but not in performing any query, the
+`~connection.poll()` method can be used to check for notification without
+wasting resources.
+
+A simple application could poll the connection from time to time to check if
+something new has arrived. A better strategy is to use some I/O completion
+function such as |select()|_ to sleep until awaken from the kernel when there is
+some data to read on the connection, thereby using no CPU unless there is
+something to read::
import select
import psycopg2
diff --git a/psycopg/connection.h b/psycopg/connection.h
index 6b74ca7..3a57e8d 100644
--- a/psycopg/connection.h
+++ b/psycopg/connection.h
@@ -123,6 +123,7 @@ HIDDEN int conn_get_isolation_level(PGresult *pgres);
HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self);
+HIDDEN void conn_notifies_process(connectionObject *self);
HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 81eae8a..66a2252 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -123,6 +123,33 @@ conn_notice_clean(connectionObject *self)
Py_END_ALLOW_THREADS;
}
+
+/* conn_notifies_process - make received notification available
+ *
+ * The function should be called with the connection lock and holding the GIL.
+ */
+
+void
+conn_notifies_process(connectionObject *self)
+{
+ PGnotify *pgn;
+
+ while ((pgn = PQnotifies(self->pgconn)) != NULL) {
+ PyObject *notify;
+
+ Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
+ (int) pgn->be_pid, pgn->relname);
+
+ notify = PyTuple_New(2);
+ PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
+ PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
+ PyList_Append(self->notifies, notify);
+ Py_DECREF(notify);
+ PQfreemem(pgn);
+ }
+}
+
+
/*
* the conn_get_* family of functions makes it easier to obtain the connection
* parameters from query results or by interrogating the connection itself
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index a229943..a79533d 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -603,8 +603,6 @@ int
pq_is_busy(connectionObject *conn)
{
int res;
- PGnotify *pgn;
-
Dprintf("pq_is_busy: consuming input");
Py_BEGIN_ALLOW_THREADS;
@@ -618,30 +616,13 @@ pq_is_busy(connectionObject *conn)
return -1;
}
-
- /* now check for notifies */
- while ((pgn = PQnotifies(conn->pgconn)) != NULL) {
- PyObject *notify;
-
- Dprintf("curs_is_busy: got NOTIFY from pid %d, msg = %s",
- (int) pgn->be_pid, pgn->relname);
-
- Py_BLOCK_THREADS;
- notify = PyTuple_New(2);
- PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
- PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
- PyList_Append(conn->notifies, notify);
- Py_DECREF(notify);
- Py_UNBLOCK_THREADS;
- PQfreemem(pgn);
- }
-
res = PQisBusy(conn->pgconn);
pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS;
conn_notice_process(conn);
+ conn_notifies_process(conn);
return res;
}
@@ -1327,13 +1308,13 @@ pq_fetch(cursorObject *curs)
break;
}
- Dprintf("pq_fetch: fetching done; check for critical errors");
-
conn_notice_process(curs->conn);
+ conn_notifies_process(curs->conn);
/* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */
+ Dprintf("pq_fetch: fetching done; check for critical errors");
if (curs->conn->critical) {
if (ex == -1) {
pq_resolve_critical(curs->conn, 1);
diff --git a/tests/__init__.py b/tests/__init__.py
index cb15389..5863de6 100755
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -28,6 +28,7 @@ import types_basic
import types_extras
import test_lobject
import test_copy
+import test_notify
import test_async
def test_suite():
@@ -43,6 +44,7 @@ def test_suite():
suite.addTest(types_extras.test_suite())
suite.addTest(test_lobject.test_suite())
suite.addTest(test_copy.test_suite())
+ suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
return suite
diff --git a/tests/test_notify.py b/tests/test_notify.py
new file mode 100755
index 0000000..722aecf
--- /dev/null
+++ b/tests/test_notify.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+import unittest
+
+import psycopg2
+from psycopg2 import extensions
+
+import time
+import select
+import signal
+from subprocess import Popen
+
+import sys
+if sys.version_info < (3,):
+ import tests
+else:
+ import py3tests as tests
+
+
+class NotifiesTests(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = psycopg2.connect(tests.dsn)
+
+ def tearDown(self):
+ self.conn.close()
+
+ def autocommit(self, conn):
+ """Set a connection in autocommit mode."""
+ conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+
+ def listen(self, name):
+ """Start listening for a name on self.conn."""
+ curs = self.conn.cursor()
+ curs.execute("LISTEN " + name)
+ curs.close()
+
+ def notify(self, name, sec=0):
+ """Send a notification to the database, eventually after some time."""
+ script = ("""\
+import time
+time.sleep(%(sec)s)
+import psycopg2
+import psycopg2.extensions
+conn = psycopg2.connect(%(dsn)r)
+conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+curs = conn.cursor()
+curs.execute("NOTIFY " %(name)r)
+curs.close()
+conn.close()
+"""
+ % { 'dsn': tests.dsn, 'sec': sec, 'name': name})
+
+ return Popen([sys.executable, '-c', script])
+
+ def test_notifies_received_on_poll(self):
+ self.autocommit(self.conn)
+ self.listen('foo')
+
+ self.notify('foo', 1);
+
+ t0 = time.time()
+ ready = select.select([self.conn], [], [], 2)
+ t1 = time.time()
+ self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0)
+
+ self.assertEqual(0, len(self.conn.notifies))
+ self.assertEqual(extensions.POLL_OK, self.conn.poll())
+ self.assertEqual(1, len(self.conn.notifies))
+ self.assertEqual('foo', self.conn.notifies[0][1])
+
+ def test_many_notifies(self):
+ self.autocommit(self.conn)
+ for name in ['foo', 'bar', 'baz']:
+ self.listen(name)
+
+ for name in ['foo', 'bar', 'baz', 'qux']:
+ self.notify(name).wait()
+
+ self.assertEqual(0, len(self.conn.notifies))
+ self.assertEqual(extensions.POLL_OK, self.conn.poll())
+ self.assertEqual(3, len(self.conn.notifies))
+ names = [n[1] for n in self.conn.notifies]
+ for name in ['foo', 'bar', 'baz']:
+ self.assert_(name in names, name)
+
+ def test_notifies_received_on_execute(self):
+ self.autocommit(self.conn)
+ self.listen('foo')
+ self.notify('foo').wait()
+ self.assertEqual(0, len(self.conn.notifies))
+ self.conn.cursor().execute('select 1;')
+ self.assertEqual(1, len(self.conn.notifies))
+ self.assertEqual('foo', self.conn.notifies[0][1])
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
+