summaryrefslogtreecommitdiff
path: root/tests/test_notify.py
diff options
context:
space:
mode:
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-04-20 18:17:27 +0100
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>2010-04-21 15:09:14 +0100
commit02a28ff028b6f06625d2bb1dc4dadd639fbe3b43 (patch)
tree5c6a6b94516af5f11937a98322805756b998222b /tests/test_notify.py
parent12ef826d500b2f9bda97032ffa58eb53036272db (diff)
downloadpsycopg2-02a28ff028b6f06625d2bb1dc4dadd639fbe3b43.tar.gz
Notifications are automatically read after each query.
Added tests for basic notifications process.
Diffstat (limited to 'tests/test_notify.py')
-rwxr-xr-xtests/test_notify.py100
1 files changed, 100 insertions, 0 deletions
diff --git a/tests/test_notify.py b/tests/test_notify.py
new file mode 100755
index 0000000..722aecf
--- /dev/null
+++ b/tests/test_notify.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+import unittest
+
+import psycopg2
+from psycopg2 import extensions
+
+import time
+import select
+import signal
+from subprocess import Popen
+
+import sys
+if sys.version_info < (3,):
+ import tests
+else:
+ import py3tests as tests
+
+
+class NotifiesTests(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = psycopg2.connect(tests.dsn)
+
+ def tearDown(self):
+ self.conn.close()
+
+ def autocommit(self, conn):
+ """Set a connection in autocommit mode."""
+ conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+
+ def listen(self, name):
+ """Start listening for a name on self.conn."""
+ curs = self.conn.cursor()
+ curs.execute("LISTEN " + name)
+ curs.close()
+
+ def notify(self, name, sec=0):
+ """Send a notification to the database, eventually after some time."""
+ script = ("""\
+import time
+time.sleep(%(sec)s)
+import psycopg2
+import psycopg2.extensions
+conn = psycopg2.connect(%(dsn)r)
+conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+curs = conn.cursor()
+curs.execute("NOTIFY " %(name)r)
+curs.close()
+conn.close()
+"""
+ % { 'dsn': tests.dsn, 'sec': sec, 'name': name})
+
+ return Popen([sys.executable, '-c', script])
+
+ def test_notifies_received_on_poll(self):
+ self.autocommit(self.conn)
+ self.listen('foo')
+
+ self.notify('foo', 1);
+
+ t0 = time.time()
+ ready = select.select([self.conn], [], [], 2)
+ t1 = time.time()
+ self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0)
+
+ self.assertEqual(0, len(self.conn.notifies))
+ self.assertEqual(extensions.POLL_OK, self.conn.poll())
+ self.assertEqual(1, len(self.conn.notifies))
+ self.assertEqual('foo', self.conn.notifies[0][1])
+
+ def test_many_notifies(self):
+ self.autocommit(self.conn)
+ for name in ['foo', 'bar', 'baz']:
+ self.listen(name)
+
+ for name in ['foo', 'bar', 'baz', 'qux']:
+ self.notify(name).wait()
+
+ self.assertEqual(0, len(self.conn.notifies))
+ self.assertEqual(extensions.POLL_OK, self.conn.poll())
+ self.assertEqual(3, len(self.conn.notifies))
+ names = [n[1] for n in self.conn.notifies]
+ for name in ['foo', 'bar', 'baz']:
+ self.assert_(name in names, name)
+
+ def test_notifies_received_on_execute(self):
+ self.autocommit(self.conn)
+ self.listen('foo')
+ self.notify('foo').wait()
+ self.assertEqual(0, len(self.conn.notifies))
+ self.conn.cursor().execute('select 1;')
+ self.assertEqual(1, len(self.conn.notifies))
+ self.assertEqual('foo', self.conn.notifies[0][1])
+
+def test_suite():
+ return unittest.TestLoader().loadTestsFromName(__name__)
+
+if __name__ == "__main__":
+ unittest.main()
+