summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2015-06-02 17:02:04 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2015-06-02 17:02:04 +0100
commit1f330e9cac9c5d40c33f4f58d0dbfc0109c62edc (patch)
tree0e4d67997ee734ac3025774a163c031e8a217b7b
parent2ad82b973b86fa71126657aacee89a30d2211894 (diff)
downloadpsycopg2-1f330e9cac9c5d40c33f4f58d0dbfc0109c62edc.tar.gz
Allow connection.notices and notifies to be replaced.
Close #326
-rw-r--r--NEWS3
-rw-r--r--doc/src/advanced.rst4
-rw-r--r--doc/src/connection.rst18
-rw-r--r--psycopg/connection_int.c69
-rw-r--r--psycopg/connection_type.c4
-rwxr-xr-xtests/test_connection.py36
-rwxr-xr-xtests/test_notify.py22
7 files changed, 134 insertions, 22 deletions
diff --git a/NEWS b/NEWS
index 4aebbff..fe6cea4 100644
--- a/NEWS
+++ b/NEWS
@@ -10,6 +10,9 @@ New features:
`~psycopg2.extensions.libpq_version()` to inspect the version of the
``libpq`` library the module was compiled/loaded with
(:tickets:`#35, #323`).
+- The attributes `~connection.notices` and `~connection.notifies` can be
+ customized replacing them with any object exposing an `!append()` method
+ (:ticket:`#326`).
What's new in psycopg 2.6.1
diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst
index eecbcfd..f0483ce 100644
--- a/doc/src/advanced.rst
+++ b/doc/src/advanced.rst
@@ -312,6 +312,10 @@ received from a previous version server will have the
Added `~psycopg2.extensions.Notify` object and handling notification
payload.
+.. versionchanged:: 2.7
+ The `~connection.notifies` attribute is writable: it is possible to
+ replace it with any object exposing an `!append()` method. An useful
+ example would be to use a `~collections.deque` object.
.. index::
diff --git a/doc/src/connection.rst b/doc/src/connection.rst
index 92178f3..cceef1e 100644
--- a/doc/src/connection.rst
+++ b/doc/src/connection.rst
@@ -483,13 +483,21 @@ The ``connection`` class
['NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"\n',
'NOTICE: CREATE TABLE will create implicit sequence "foo_id_seq" for serial column "foo.id"\n']
+ .. versionchanged:: 2.7
+ The `!notices` attribute is writable: the user may replace it
+ with any Python object exposing an `!append()` method. If
+ appending raises an exception the notice is silently
+ dropped.
+
To avoid a leak in case excessive notices are generated, only the last
- 50 messages are kept.
+ 50 messages are kept. This check is only in place if the `!notices`
+ attribute is a list: if any other object is used it will be up to the
+ user to guard from leakage.
You can configure what messages to receive using `PostgreSQL logging
configuration parameters`__ such as ``log_statement``,
``client_min_messages``, ``log_min_duration_statement`` etc.
-
+
.. __: http://www.postgresql.org/docs/current/static/runtime-config-logging.html
@@ -506,6 +514,12 @@ The ``connection`` class
the payload was not accessible. To keep backward compatibility,
`!Notify` objects can still be accessed as 2 items tuples.
+ .. versionchanged:: 2.7
+ The `!notifies` attribute is writable: the user may replace it
+ with any Python object exposing an `!append()` method. If
+ appending raises an exception the notification is silently
+ dropped.
+
.. attribute:: cursor_factory
diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c
index 40f7e6c..43d0fda 100644
--- a/psycopg/connection_int.c
+++ b/psycopg/connection_int.c
@@ -111,41 +111,60 @@ void
conn_notice_process(connectionObject *self)
{
struct connectionObject_notice *notice;
- Py_ssize_t nnotices;
+ PyObject *msg = NULL;
+ PyObject *tmp = NULL;
+ static PyObject *append;
if (NULL == self->notice_pending) {
return;
}
- notice = self->notice_pending;
+ if (!append) {
+ if (!(append = Text_FromUTF8("append"))) {
+ goto error;
+ }
+ }
+ notice = self->notice_pending;
while (notice != NULL) {
- PyObject *msg;
- msg = conn_text_from_chars(self, notice->message);
Dprintf("conn_notice_process: %s", notice->message);
- if (msg) {
- PyList_Append(self->notice_list, msg);
- Py_DECREF(msg);
- }
- else {
- /* We don't really have a way to report errors, so gulp it.
- * The function should only fail for out of memory, so we are
- * likely going to die anyway. */
- PyErr_Clear();
+ if (!(msg = conn_text_from_chars(self, notice->message))) { goto error; }
+
+ if (!(tmp = PyObject_CallMethodObjArgs(
+ self->notice_list, append, msg, NULL))) {
+
+ goto error;
}
+ Py_DECREF(tmp); tmp = NULL;
+ Py_DECREF(msg); msg = NULL;
+
notice = notice->next;
}
/* Remove the oldest item if the queue is getting too long. */
- nnotices = PyList_GET_SIZE(self->notice_list);
- if (nnotices > CONN_NOTICES_LIMIT) {
- PySequence_DelSlice(self->notice_list,
- 0, nnotices - CONN_NOTICES_LIMIT);
+ if (PyList_Check(self->notice_list)) {
+ Py_ssize_t nnotices;
+ nnotices = PyList_GET_SIZE(self->notice_list);
+ if (nnotices > CONN_NOTICES_LIMIT) {
+ if (-1 == PySequence_DelSlice(self->notice_list,
+ 0, nnotices - CONN_NOTICES_LIMIT)) {
+ PyErr_Clear();
+ }
+ }
}
conn_notice_clean(self);
+ return;
+
+error:
+ Py_XDECREF(tmp);
+ Py_XDECREF(msg);
+ conn_notice_clean(self);
+
+ /* TODO: the caller doesn't expects errors from us */
+ PyErr_Clear();
}
void
@@ -177,6 +196,15 @@ conn_notifies_process(connectionObject *self)
PGnotify *pgn = NULL;
PyObject *notify = NULL;
PyObject *pid = NULL, *channel = NULL, *payload = NULL;
+ PyObject *tmp = NULL;
+
+ static PyObject *append;
+
+ if (!append) {
+ if (!(append = Text_FromUTF8("append"))) {
+ goto error;
+ }
+ }
while ((pgn = PQnotifies(self->pgconn)) != NULL) {
@@ -196,7 +224,11 @@ conn_notifies_process(connectionObject *self)
Py_DECREF(channel); channel = NULL;
Py_DECREF(payload); payload = NULL;
- PyList_Append(self->notifies, (PyObject *)notify);
+ if (!(tmp = PyObject_CallMethodObjArgs(
+ self->notifies, append, notify, NULL))) {
+ goto error;
+ }
+ Py_DECREF(tmp); tmp = NULL;
Py_DECREF(notify); notify = NULL;
PQfreemem(pgn); pgn = NULL;
@@ -205,6 +237,7 @@ conn_notifies_process(connectionObject *self)
error:
if (pgn) { PQfreemem(pgn); }
+ Py_XDECREF(tmp);
Py_XDECREF(notify);
Py_XDECREF(pid);
Py_XDECREF(channel);
diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c
index 9931399..2c1dddf 100644
--- a/psycopg/connection_type.c
+++ b/psycopg/connection_type.c
@@ -1001,8 +1001,8 @@ static struct PyMemberDef connectionObject_members[] = {
"True if the connection is closed."},
{"encoding", T_STRING, offsetof(connectionObject, encoding), READONLY,
"The current client encoding."},
- {"notices", T_OBJECT, offsetof(connectionObject, notice_list), READONLY},
- {"notifies", T_OBJECT, offsetof(connectionObject, notifies), READONLY},
+ {"notices", T_OBJECT, offsetof(connectionObject, notice_list), 0},
+ {"notifies", T_OBJECT, offsetof(connectionObject, notifies), 0},
{"dsn", T_STRING, offsetof(connectionObject, dsn), READONLY,
"The current connection string."},
{"async", T_LONG, offsetof(connectionObject, async), READONLY,
diff --git a/tests/test_connection.py b/tests/test_connection.py
index 340693e..fa78eb3 100755
--- a/tests/test_connection.py
+++ b/tests/test_connection.py
@@ -129,6 +129,42 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(50, len(conn.notices))
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
+ def test_notices_deque(self):
+ from collections import deque
+
+ conn = self.conn
+ self.conn.notices = deque()
+ cur = conn.cursor()
+ if self.conn.server_version >= 90300:
+ cur.execute("set client_min_messages=debug1")
+
+ cur.execute("create temp table table1 (id serial); create temp table table2 (id serial);")
+ cur.execute("create temp table table3 (id serial); create temp table table4 (id serial);")
+ self.assertEqual(len(conn.notices), 4)
+ self.assert_('table1' in conn.notices.popleft())
+ self.assert_('table2' in conn.notices.popleft())
+ self.assert_('table3' in conn.notices.popleft())
+ self.assert_('table4' in conn.notices.popleft())
+ self.assertEqual(len(conn.notices), 0)
+
+ # not limited, but no error
+ for i in range(0, 100, 10):
+ sql = " ".join(["create temp table table2_%d (id serial);" % j for j in range(i, i+10)])
+ cur.execute(sql)
+
+ self.assertEqual(100, len(conn.notices))
+
+ def test_notices_noappend(self):
+ conn = self.conn
+ self.conn.notices = None # will make an error swallowes ok
+ cur = conn.cursor()
+ if self.conn.server_version >= 90300:
+ cur.execute("set client_min_messages=debug1")
+
+ cur.execute("create temp table table1 (id serial);")
+
+ self.assertEqual(self.conn.notices, None)
+
def test_server_version(self):
self.assert_(self.conn.server_version)
diff --git a/tests/test_notify.py b/tests/test_notify.py
index f838389..fc6224d 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -155,6 +155,27 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('Hello, world!', notify.payload)
+ def test_notify_deque(self):
+ from collections import deque
+ self.autocommit(self.conn)
+ self.conn.notifies = deque()
+ self.listen('foo')
+ self.notify('foo').communicate()
+ time.sleep(0.5)
+ self.conn.poll()
+ notify = self.conn.notifies.popleft()
+ self.assert_(isinstance(notify, psycopg2.extensions.Notify))
+ self.assertEqual(len(self.conn.notifies), 0)
+
+ def test_notify_noappend(self):
+ self.autocommit(self.conn)
+ self.conn.notifies = None
+ self.listen('foo')
+ self.notify('foo').communicate()
+ time.sleep(0.5)
+ self.conn.poll()
+ self.assertEqual(self.conn.notifies, None)
+
def test_notify_init(self):
n = psycopg2.extensions.Notify(10, 'foo')
self.assertEqual(10, n.pid)
@@ -192,6 +213,7 @@ conn.close()
self.assertNotEqual(hash(Notify(10, 'foo', 'bar')),
hash(Notify(10, 'foo')))
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)