summaryrefslogtreecommitdiff
path: root/tests/test_notify.py
blob: 722aecf55b840cb7b5a4e75383daba6ac388ac56 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
import unittest

import psycopg2
from psycopg2 import extensions

import time
import select
import signal
from subprocess import Popen

import sys
if sys.version_info < (3,):
    import tests
else:
    import py3tests as tests


class NotifiesTests(unittest.TestCase):

    def setUp(self):
        self.conn = psycopg2.connect(tests.dsn)

    def tearDown(self):
        self.conn.close()

    def autocommit(self, conn):
        """Set a connection in autocommit mode."""
        conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    def listen(self, name):
        """Start listening for a name on self.conn."""
        curs = self.conn.cursor()
        curs.execute("LISTEN " + name)
        curs.close()

    def notify(self, name, sec=0):
        """Send a notification to the database, eventually after some time."""
        script = ("""\
import time
time.sleep(%(sec)s)
import psycopg2
import psycopg2.extensions
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r)
curs.close()
conn.close()
"""
            % { 'dsn': tests.dsn, 'sec': sec, 'name': name})

        return Popen([sys.executable, '-c', script])

    def test_notifies_received_on_poll(self):
        self.autocommit(self.conn)
        self.listen('foo')

        self.notify('foo', 1);

        t0 = time.time()
        ready = select.select([self.conn], [], [], 2)
        t1 = time.time()
        self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0)

        self.assertEqual(0, len(self.conn.notifies))
        self.assertEqual(extensions.POLL_OK, self.conn.poll())
        self.assertEqual(1, len(self.conn.notifies))
        self.assertEqual('foo', self.conn.notifies[0][1])

    def test_many_notifies(self):
        self.autocommit(self.conn)
        for name in ['foo', 'bar', 'baz']:
            self.listen(name)

        for name in ['foo', 'bar', 'baz', 'qux']:
            self.notify(name).wait()

        self.assertEqual(0, len(self.conn.notifies))
        self.assertEqual(extensions.POLL_OK, self.conn.poll())
        self.assertEqual(3, len(self.conn.notifies))
        names = [n[1] for n in self.conn.notifies]
        for name in ['foo', 'bar', 'baz']:
            self.assert_(name in names, name)

    def test_notifies_received_on_execute(self):
        self.autocommit(self.conn)
        self.listen('foo')
        self.notify('foo').wait()
        self.assertEqual(0, len(self.conn.notifies))
        self.conn.cursor().execute('select 1;')
        self.assertEqual(1, len(self.conn.notifies))
        self.assertEqual('foo', self.conn.notifies[0][1])

def test_suite():
    return unittest.TestLoader().loadTestsFromName(__name__)

if __name__ == "__main__":
    unittest.main()