diff options
| author | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-20 18:17:27 +0100 |
|---|---|---|
| committer | Daniele Varrazzo <daniele.varrazzo@gmail.com> | 2010-04-21 15:09:14 +0100 |
| commit | 02a28ff028b6f06625d2bb1dc4dadd639fbe3b43 (patch) | |
| tree | 5c6a6b94516af5f11937a98322805756b998222b /tests/test_notify.py | |
| parent | 12ef826d500b2f9bda97032ffa58eb53036272db (diff) | |
| download | psycopg2-02a28ff028b6f06625d2bb1dc4dadd639fbe3b43.tar.gz | |
Notifications are automatically read after each query.
Added tests for basic notifications process.
Diffstat (limited to 'tests/test_notify.py')
| -rwxr-xr-x | tests/test_notify.py | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/tests/test_notify.py b/tests/test_notify.py new file mode 100755 index 0000000..722aecf --- /dev/null +++ b/tests/test_notify.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +import unittest + +import psycopg2 +from psycopg2 import extensions + +import time +import select +import signal +from subprocess import Popen + +import sys +if sys.version_info < (3,): + import tests +else: + import py3tests as tests + + +class NotifiesTests(unittest.TestCase): + + def setUp(self): + self.conn = psycopg2.connect(tests.dsn) + + def tearDown(self): + self.conn.close() + + def autocommit(self, conn): + """Set a connection in autocommit mode.""" + conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + + def listen(self, name): + """Start listening for a name on self.conn.""" + curs = self.conn.cursor() + curs.execute("LISTEN " + name) + curs.close() + + def notify(self, name, sec=0): + """Send a notification to the database, eventually after some time.""" + script = ("""\ +import time +time.sleep(%(sec)s) +import psycopg2 +import psycopg2.extensions +conn = psycopg2.connect(%(dsn)r) +conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) +curs = conn.cursor() +curs.execute("NOTIFY " %(name)r) +curs.close() +conn.close() +""" + % { 'dsn': tests.dsn, 'sec': sec, 'name': name}) + + return Popen([sys.executable, '-c', script]) + + def test_notifies_received_on_poll(self): + self.autocommit(self.conn) + self.listen('foo') + + self.notify('foo', 1); + + t0 = time.time() + ready = select.select([self.conn], [], [], 2) + t1 = time.time() + self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0) + + self.assertEqual(0, len(self.conn.notifies)) + self.assertEqual(extensions.POLL_OK, self.conn.poll()) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual('foo', self.conn.notifies[0][1]) + + def test_many_notifies(self): + self.autocommit(self.conn) + for name in ['foo', 'bar', 'baz']: + self.listen(name) + + for name in ['foo', 'bar', 'baz', 'qux']: + self.notify(name).wait() + + self.assertEqual(0, len(self.conn.notifies)) + self.assertEqual(extensions.POLL_OK, self.conn.poll()) + self.assertEqual(3, len(self.conn.notifies)) + names = [n[1] for n in self.conn.notifies] + for name in ['foo', 'bar', 'baz']: + self.assert_(name in names, name) + + def test_notifies_received_on_execute(self): + self.autocommit(self.conn) + self.listen('foo') + self.notify('foo').wait() + self.assertEqual(0, len(self.conn.notifies)) + self.conn.cursor().execute('select 1;') + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual('foo', self.conn.notifies[0][1]) + +def test_suite(): + return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == "__main__": + unittest.main() + |
