summaryrefslogtreecommitdiff
path: root/kazoo/tests/test_selectors_select.py
blob: 19b224d13e543ed0c675345d1bd77f2fdd3fa334 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
The official python select function test case copied from python source
 to test the selector_select function.
"""

import errno
import os
import socket
import sys
import unittest

from kazoo.handlers.utils import selector_select

select = selector_select


@unittest.skipIf(
    (sys.platform[:3] == "win"), "can't easily test on this system"
)
class SelectTestCase(unittest.TestCase):
    class Nope:
        pass

    class Almost:
        def fileno(self):
            return "fileno"

    def test_error_conditions(self):
        self.assertRaises(TypeError, select, 1, 2, 3)
        self.assertRaises(TypeError, select, [self.Nope()], [], [])
        self.assertRaises(TypeError, select, [self.Almost()], [], [])
        self.assertRaises(TypeError, select, [], [], [], "not a number")
        self.assertRaises(ValueError, select, [], [], [], -1)

    # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
    @unittest.skipIf(
        sys.platform.startswith("freebsd"),
        "skip because of a FreeBSD bug: kern/155606",
    )
    def test_errno(self):
        with open(__file__, "rb") as fp:
            fd = fp.fileno()
            fp.close()
            try:
                select([fd], [], [], 0)
            except OSError as err:
                self.assertEqual(err.errno, errno.EBADF)
            else:
                self.fail("exception not raised")

    def test_returned_list_identity(self):
        # See issue #8329
        r, w, x = select([], [], [], 1)
        self.assertIsNot(r, w)
        self.assertIsNot(r, x)
        self.assertIsNot(w, x)

    def test_select(self):
        cmd = "for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done"
        p = os.popen(cmd, "r")
        for tout in (0, 1, 2, 4, 8, 16) + (None,) * 10:
            rfd, wfd, xfd = select([p], [], [], tout)
            if (rfd, wfd, xfd) == ([], [], []):
                continue
            if (rfd, wfd, xfd) == ([p], [], []):
                line = p.readline()
                if not line:
                    break
                continue
            self.fail("Unexpected return values from select():", rfd, wfd, xfd)
        p.close()

    # Issue 16230: Crash on select resized list
    def test_select_mutated(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            a = []

            class F:
                def fileno(self):
                    del a[-1]
                    return s.fileno()

            a[:] = [F()] * 10
            self.assertEqual(select([], a, []), ([], a[:5], []))


if __name__ == "__main__":
    unittest.main()