diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-10-15 09:42:44 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-11-05 09:34:46 +0000 |
| commit | f435d15c956e5bc99d907af8f41f0985380ce591 (patch) | |
| tree | a90d5c84a1ef3de2828210913482472776236f28 /tests/test_notify.py | |
| parent | e651308287fa91b7f3f633a56ef0d02ad563c872 (diff) | |
| download | psycopg2-f435d15c956e5bc99d907af8f41f0985380ce591.tar.gz | |
Adding Notify object with payload.
Diffstat (limited to 'tests/test_notify.py')
| -rwxr-xr-x | tests/test_notify.py | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/tests/test_notify.py b/tests/test_notify.py index 8793aa2..acad27c 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -1,5 +1,6 @@ #!/usr/bin/env python import unittest +import warnings import psycopg2 from psycopg2 import extensions @@ -34,8 +35,13 @@ class NotifiesTests(unittest.TestCase): curs.execute("LISTEN " + name) curs.close() - def notify(self, name, sec=0): + def notify(self, name, sec=0, payload=None): """Send a notification to the database, eventually after some time.""" + if payload is None: + payload = '' + else: + payload = ", %r" % payload + script = ("""\ import time time.sleep(%(sec)s) @@ -45,11 +51,11 @@ conn = psycopg2.connect(%(dsn)r) conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) print conn.get_backend_pid() curs = conn.cursor() -curs.execute("NOTIFY " %(name)r) +curs.execute("NOTIFY " %(name)r %(payload)r) curs.close() conn.close() """ - % { 'dsn': tests.dsn, 'sec': sec, 'name': name}) + % { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload}) return Popen([sys.executable, '-c', script], stdout=PIPE) @@ -99,6 +105,33 @@ conn.close() self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) + def test_notify_attributes(self): + self.autocommit(self.conn) + self.listen('foo') + pid = int(self.notify('foo').communicate()[0]) + self.conn.poll() + self.assertEqual(1, len(self.conn.notifies)) + notify = self.conn.notifies[0] + self.assertEqual(pid, notify.pid) + self.assertEqual('foo', notify.channel) + self.assertEqual('', notify.payload) + + def test_notify_payload(self): + if self.conn.server_version < 90000: + warnings.warn("server version %s doesn't support notify payload: skipping test" + % self.conn.server_version) + return + self.autocommit(self.conn) + self.listen('foo') + pid = int(self.notify('foo', payload="Hello, world!").communicate()[0]) + self.conn.poll() + self.assertEqual(1, len(self.conn.notifies)) + notify = self.conn.notifies[0] + self.assertEqual(pid, notify.pid) + self.assertEqual('foo', notify.channel) + self.assertEqual('Hello, world!', notify.payload) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) |
