summaryrefslogtreecommitdiff
path: root/tests/test_notify.py
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-10-15 09:42:44 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-11-05 09:34:46 +0000
commitf435d15c956e5bc99d907af8f41f0985380ce591 (patch)
treea90d5c84a1ef3de2828210913482472776236f28 /tests/test_notify.py
parente651308287fa91b7f3f633a56ef0d02ad563c872 (diff)
downloadpsycopg2-f435d15c956e5bc99d907af8f41f0985380ce591.tar.gz
Adding Notify object with payload.
Diffstat (limited to 'tests/test_notify.py')
-rwxr-xr-xtests/test_notify.py39
1 files changed, 36 insertions, 3 deletions
diff --git a/tests/test_notify.py b/tests/test_notify.py
index 8793aa2..acad27c 100755
--- a/tests/test_notify.py
+++ b/tests/test_notify.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
import unittest
+import warnings
import psycopg2
from psycopg2 import extensions
@@ -34,8 +35,13 @@ class NotifiesTests(unittest.TestCase):
curs.execute("LISTEN " + name)
curs.close()
- def notify(self, name, sec=0):
+ def notify(self, name, sec=0, payload=None):
"""Send a notification to the database, eventually after some time."""
+ if payload is None:
+ payload = ''
+ else:
+ payload = ", %r" % payload
+
script = ("""\
import time
time.sleep(%(sec)s)
@@ -45,11 +51,11 @@ conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
print conn.get_backend_pid()
curs = conn.cursor()
-curs.execute("NOTIFY " %(name)r)
+curs.execute("NOTIFY " %(name)r %(payload)r)
curs.close()
conn.close()
"""
- % { 'dsn': tests.dsn, 'sec': sec, 'name': name})
+ % { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload})
return Popen([sys.executable, '-c', script], stdout=PIPE)
@@ -99,6 +105,33 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
+ def test_notify_attributes(self):
+ self.autocommit(self.conn)
+ self.listen('foo')
+ pid = int(self.notify('foo').communicate()[0])
+ self.conn.poll()
+ self.assertEqual(1, len(self.conn.notifies))
+ notify = self.conn.notifies[0]
+ self.assertEqual(pid, notify.pid)
+ self.assertEqual('foo', notify.channel)
+ self.assertEqual('', notify.payload)
+
+ def test_notify_payload(self):
+ if self.conn.server_version < 90000:
+ warnings.warn("server version %s doesn't support notify payload: skipping test"
+ % self.conn.server_version)
+ return
+ self.autocommit(self.conn)
+ self.listen('foo')
+ pid = int(self.notify('foo', payload="Hello, world!").communicate()[0])
+ self.conn.poll()
+ self.assertEqual(1, len(self.conn.notifies))
+ notify = self.conn.notifies[0]
+ self.assertEqual(pid, notify.pid)
+ self.assertEqual('foo', notify.channel)
+ self.assertEqual('Hello, world!', notify.payload)
+
+
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)