summaryrefslogtreecommitdiff
path: root/Lib/test/test_kqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_kqueue.py')
-rw-r--r--Lib/test/test_kqueue.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/Lib/test/test_kqueue.py b/Lib/test/test_kqueue.py
new file mode 100644
index 0000000000..310eb33962
--- /dev/null
+++ b/Lib/test/test_kqueue.py
@@ -0,0 +1,166 @@
+"""
+Tests for kqueue wrapper.
+"""
+import socket
+import errno
+import time
+import select
+import sys
+import unittest
+
+from test import test_support
+if not hasattr(select, "kqueue"):
+ raise test_support.TestSkipped("test works only on BSD")
+
+class TestKQueue(unittest.TestCase):
+ def test_create_queue(self):
+ kq = select.kqueue()
+ self.assert_(kq.fileno() > 0, kq.fileno())
+ self.assert_(not kq.closed)
+ kq.close()
+ self.assert_(kq.closed)
+ self.assertRaises(ValueError, kq.fileno)
+
+ def test_create_event(self):
+ fd = sys.stderr.fileno()
+ ev = select.kevent(fd)
+ other = select.kevent(1000)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_READ)
+ self.assertEqual(ev.flags, select.KQ_EV_ADD)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+ self.assertEqual(cmp(ev, other), -1)
+ self.assert_(ev < other)
+ self.assert_(other >= ev)
+ self.assertRaises(TypeError, cmp, ev, None)
+ self.assertRaises(TypeError, cmp, ev, 1)
+ self.assertRaises(TypeError, cmp, ev, "ev")
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.flags, select.KQ_EV_ADD)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
+ self.assertEqual(ev.ident, fd)
+ self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
+ self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
+ self.assertEqual(ev.fflags, 0)
+ self.assertEqual(ev.data, 0)
+ self.assertEqual(ev.udata, 0)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ ev = select.kevent(1, 2, 3, 4, 5, 6)
+ self.assertEqual(ev.ident, 1)
+ self.assertEqual(ev.filter, 2)
+ self.assertEqual(ev.flags, 3)
+ self.assertEqual(ev.fflags, 4)
+ self.assertEqual(ev.data, 5)
+ self.assertEqual(ev.udata, 6)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
+ def test_queue_event(self):
+ serverSocket = socket.socket()
+ serverSocket.bind(('127.0.0.1', 0))
+ serverSocket.listen(1)
+ client = socket.socket()
+ client.setblocking(False)
+ try:
+ client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
+ except socket.error as e:
+ self.assertEquals(e.args[0], errno.EINPROGRESS)
+ else:
+ #raise AssertionError("Connect should have raised EINPROGRESS")
+ pass # FreeBSD doesn't raise an exception here
+ server, addr = serverSocket.accept()
+
+ if sys.platform.startswith("darwin"):
+ flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
+ else:
+ flags = 0
+
+ kq = select.kqueue()
+ kq2 = select.kqueue.fromfd(kq.fileno())
+
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq.control([ev], 0)
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+
+ events = kq.control(None, 4, 1)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ self.assertEquals(events, [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags)])
+
+ client.send(b"Hello!")
+ server.send(b"world!!!")
+
+ events = kq.control(None, 4, 1)
+ # We may need to call it several times
+ for i in range(5):
+ if len(events) == 4:
+ break
+ events = kq.control(None, 4, 1)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+
+ self.assertEquals(events, [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (client.fileno(), select.KQ_FILTER_READ, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_READ, flags)])
+
+ # Remove completely client, and server read part
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_WRITE,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0)
+ ev = select.kevent(client.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0)
+ ev = select.kevent(server.fileno(),
+ select.KQ_FILTER_READ,
+ select.KQ_EV_DELETE)
+ kq.control([ev], 0, 0)
+
+ events = kq.control([], 4, 0.99)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ self.assertEquals(events, [
+ (server.fileno(), select.KQ_FILTER_WRITE, flags)])
+
+ client.close()
+ server.close()
+ serverSocket.close()
+
+def test_main():
+ test_support.run_unittest(TestKQueue)
+
+if __name__ == "__main__":
+ test_main()