summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2015-01-09 15:16:08 +0100
committerVictor Stinner <victor.stinner@gmail.com>2015-01-09 15:16:08 +0100
commitc1d8fa36d1fe74e4c357d7421c997c834d717af6 (patch)
tree6aa35822c92fd32f59ed8761c4e7d5817cc1d558
parent4017697430eb5ae36d275be8f6ad64107404b13c (diff)
downloadtrollius-c1d8fa36d1fe74e4c357d7421c997c834d717af6.tar.gz
Replace test_selectors.py with the file of Python 3.5 adapted for asyncio and
Python 3.3 * Use time.time if time.monotonic is not available * Get socketpair from asyncio.test_utils * Get selectors from asyncio.selectors
-rw-r--r--tests/test_selectors.py556
1 files changed, 395 insertions, 161 deletions
diff --git a/tests/test_selectors.py b/tests/test_selectors.py
index d91c78b..3d5ef91 100644
--- a/tests/test_selectors.py
+++ b/tests/test_selectors.py
@@ -1,214 +1,448 @@
-"""Tests for selectors.py."""
-
+import errno
+import os
+import random
+import signal
+import sys
+from test import support
+from time import sleep
import unittest
-from unittest import mock
-
+import unittest.mock
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time as time
+try:
+ import resource
+except ImportError:
+ resource = None
from asyncio import selectors
+from asyncio.test_utils import socketpair
-class FakeSelector(selectors._BaseSelectorImpl):
- """Trivial non-abstract subclass of BaseSelector."""
+def find_ready_matching(ready, flag):
+ match = []
+ for key, events in ready:
+ if events & flag:
+ match.append(key.fileobj)
+ return match
- def select(self, timeout=None):
- raise NotImplementedError
+class BaseSelectorTestCase(unittest.TestCase):
-class _SelectorMappingTests(unittest.TestCase):
+ def make_socketpair(self):
+ rd, wr = socketpair()
+ self.addCleanup(rd.close)
+ self.addCleanup(wr.close)
+ return rd, wr
- def test_len(self):
- s = FakeSelector()
- map = selectors._SelectorMapping(s)
- self.assertTrue(map.__len__() == 0)
-
- f = mock.Mock()
- f.fileno.return_value = 10
- s.register(f, selectors.EVENT_READ, None)
- self.assertTrue(len(map) == 1)
+ def test_register(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- def test_getitem(self):
- s = FakeSelector()
- map = selectors._SelectorMapping(s)
- f = mock.Mock()
- f.fileno.return_value = 10
- s.register(f, selectors.EVENT_READ, None)
- attended = selectors.SelectorKey(f, 10, selectors.EVENT_READ, None)
- self.assertEqual(attended, map.__getitem__(f))
+ rd, wr = self.make_socketpair()
- def test_getitem_key_error(self):
- s = FakeSelector()
- map = selectors._SelectorMapping(s)
- self.assertTrue(len(map) == 0)
- f = mock.Mock()
- f.fileno.return_value = 10
- s.register(f, selectors.EVENT_READ, None)
- self.assertRaises(KeyError, map.__getitem__, 5)
+ key = s.register(rd, selectors.EVENT_READ, "data")
+ self.assertIsInstance(key, selectors.SelectorKey)
+ self.assertEqual(key.fileobj, rd)
+ self.assertEqual(key.fd, rd.fileno())
+ self.assertEqual(key.events, selectors.EVENT_READ)
+ self.assertEqual(key.data, "data")
- def test_iter(self):
- s = FakeSelector()
- map = selectors._SelectorMapping(s)
- self.assertTrue(len(map) == 0)
- f = mock.Mock()
- f.fileno.return_value = 5
- s.register(f, selectors.EVENT_READ, None)
- counter = 0
- for fileno in map.__iter__():
- self.assertEqual(5, fileno)
- counter += 1
+ # register an unknown event
+ self.assertRaises(ValueError, s.register, 0, 999999)
- for idx in map:
- self.assertEqual(f, map[idx].fileobj)
- self.assertEqual(1, counter)
+ # register an invalid FD
+ self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)
+ # register twice
+ self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)
-class BaseSelectorTests(unittest.TestCase):
- def test_fileobj_to_fd(self):
- self.assertEqual(10, selectors._fileobj_to_fd(10))
+ # register the same FD, but with a different object
+ self.assertRaises(KeyError, s.register, rd.fileno(),
+ selectors.EVENT_READ)
- f = mock.Mock()
- f.fileno.return_value = 10
- self.assertEqual(10, selectors._fileobj_to_fd(f))
+ def test_unregister(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ rd, wr = self.make_socketpair()
+
+ s.register(rd, selectors.EVENT_READ)
+ s.unregister(rd)
+
+ # unregister an unknown file obj
+ self.assertRaises(KeyError, s.unregister, 999999)
+
+ # unregister twice
+ self.assertRaises(KeyError, s.unregister, rd)
+
+ def test_unregister_after_fd_close(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+ rd, wr = self.make_socketpair()
+ r, w = rd.fileno(), wr.fileno()
+ s.register(r, selectors.EVENT_READ)
+ s.register(w, selectors.EVENT_WRITE)
+ rd.close()
+ wr.close()
+ s.unregister(r)
+ s.unregister(w)
+
+ @unittest.skipUnless(os.name == 'posix', "requires posix")
+ def test_unregister_after_fd_close_and_reuse(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+ rd, wr = self.make_socketpair()
+ r, w = rd.fileno(), wr.fileno()
+ s.register(r, selectors.EVENT_READ)
+ s.register(w, selectors.EVENT_WRITE)
+ rd2, wr2 = self.make_socketpair()
+ rd.close()
+ wr.close()
+ os.dup2(rd2.fileno(), r)
+ os.dup2(wr2.fileno(), w)
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ s.unregister(r)
+ s.unregister(w)
+
+ def test_unregister_after_socket_close(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+ rd, wr = self.make_socketpair()
+ s.register(rd, selectors.EVENT_READ)
+ s.register(wr, selectors.EVENT_WRITE)
+ rd.close()
+ wr.close()
+ s.unregister(rd)
+ s.unregister(wr)
- f.fileno.side_effect = AttributeError
- self.assertRaises(ValueError, selectors._fileobj_to_fd, f)
+ def test_modify(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- f.fileno.return_value = -1
- self.assertRaises(ValueError, selectors._fileobj_to_fd, f)
+ rd, wr = self.make_socketpair()
- def test_selector_key_repr(self):
- key = selectors.SelectorKey(10, 10, selectors.EVENT_READ, None)
- self.assertEqual(
- "SelectorKey(fileobj=10, fd=10, events=1, data=None)", repr(key))
+ key = s.register(rd, selectors.EVENT_READ)
- def test_register(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ # modify events
+ key2 = s.modify(rd, selectors.EVENT_WRITE)
+ self.assertNotEqual(key.events, key2.events)
+ self.assertEqual(key2, s.get_key(rd))
- s = FakeSelector()
- key = s.register(fobj, selectors.EVENT_READ)
- self.assertIsInstance(key, selectors.SelectorKey)
- self.assertEqual(key.fd, 10)
- self.assertIs(key, s._fd_to_key[10])
+ s.unregister(rd)
- def test_register_unknown_event(self):
- s = FakeSelector()
- self.assertRaises(ValueError, s.register, mock.Mock(), 999999)
+ # modify data
+ d1 = object()
+ d2 = object()
- def test_register_already_registered(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ key = s.register(rd, selectors.EVENT_READ, d1)
+ key2 = s.modify(rd, selectors.EVENT_READ, d2)
+ self.assertEqual(key.events, key2.events)
+ self.assertNotEqual(key.data, key2.data)
+ self.assertEqual(key2, s.get_key(rd))
+ self.assertEqual(key2.data, d2)
- s = FakeSelector()
- s.register(fobj, selectors.EVENT_READ)
- self.assertRaises(KeyError, s.register, fobj, selectors.EVENT_READ)
+ # modify unknown file obj
+ self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)
- def test_unregister(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ # modify use a shortcut
+ d3 = object()
+ s.register = unittest.mock.Mock()
+ s.unregister = unittest.mock.Mock()
- s = FakeSelector()
- s.register(fobj, selectors.EVENT_READ)
- s.unregister(fobj)
- self.assertFalse(s._fd_to_key)
+ s.modify(rd, selectors.EVENT_READ, d3)
+ self.assertFalse(s.register.called)
+ self.assertFalse(s.unregister.called)
- def test_unregister_unknown(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ def test_close(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- s = FakeSelector()
- self.assertRaises(KeyError, s.unregister, fobj)
+ rd, wr = self.make_socketpair()
- def test_modify_unknown(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ s.register(rd, selectors.EVENT_READ)
+ s.register(wr, selectors.EVENT_WRITE)
- s = FakeSelector()
- self.assertRaises(KeyError, s.modify, fobj, 1)
+ s.close()
+ self.assertRaises(KeyError, s.get_key, rd)
+ self.assertRaises(KeyError, s.get_key, wr)
- def test_modify(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ def test_get_key(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- s = FakeSelector()
- key = s.register(fobj, selectors.EVENT_READ)
- key2 = s.modify(fobj, selectors.EVENT_WRITE)
- self.assertNotEqual(key.events, key2.events)
- self.assertEqual(
- selectors.SelectorKey(fobj, 10, selectors.EVENT_WRITE, None),
- s.get_key(fobj))
+ rd, wr = self.make_socketpair()
- def test_modify_data(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ key = s.register(rd, selectors.EVENT_READ, "data")
+ self.assertEqual(key, s.get_key(rd))
- d1 = object()
- d2 = object()
+ # unknown file obj
+ self.assertRaises(KeyError, s.get_key, 999999)
- s = FakeSelector()
- key = s.register(fobj, selectors.EVENT_READ, d1)
- key2 = s.modify(fobj, selectors.EVENT_READ, d2)
- self.assertEqual(key.events, key2.events)
- self.assertNotEqual(key.data, key2.data)
- self.assertEqual(
- selectors.SelectorKey(fobj, 10, selectors.EVENT_READ, d2),
- s.get_key(fobj))
+ def test_get_map(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- def test_modify_data_use_a_shortcut(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ rd, wr = self.make_socketpair()
- d1 = object()
- d2 = object()
+ keys = s.get_map()
+ self.assertFalse(keys)
+ self.assertEqual(len(keys), 0)
+ self.assertEqual(list(keys), [])
+ key = s.register(rd, selectors.EVENT_READ, "data")
+ self.assertIn(rd, keys)
+ self.assertEqual(key, keys[rd])
+ self.assertEqual(len(keys), 1)
+ self.assertEqual(list(keys), [rd.fileno()])
+ self.assertEqual(list(keys.values()), [key])
- s = FakeSelector()
- s.register(fobj, selectors.EVENT_READ, d1)
+ # unknown file obj
+ with self.assertRaises(KeyError):
+ keys[999999]
- s.unregister = mock.Mock()
- s.register = mock.Mock()
- s.modify(fobj, selectors.EVENT_READ, d2)
- self.assertFalse(s.unregister.called)
- self.assertFalse(s.register.called)
+ # Read-only mapping
+ with self.assertRaises(TypeError):
+ del keys[rd]
- def test_modify_same(self):
- fobj = mock.Mock()
- fobj.fileno.return_value = 10
+ def test_select(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
- data = object()
+ rd, wr = self.make_socketpair()
- s = FakeSelector()
- key = s.register(fobj, selectors.EVENT_READ, data)
- key2 = s.modify(fobj, selectors.EVENT_READ, data)
- self.assertIs(key, key2)
+ s.register(rd, selectors.EVENT_READ)
+ wr_key = s.register(wr, selectors.EVENT_WRITE)
- def test_select(self):
- s = FakeSelector()
- self.assertRaises(NotImplementedError, s.select)
+ result = s.select()
+ for key, events in result:
+ self.assertTrue(isinstance(key, selectors.SelectorKey))
+ self.assertTrue(events)
+ self.assertFalse(events & ~(selectors.EVENT_READ |
+ selectors.EVENT_WRITE))
- def test_close(self):
- s = FakeSelector()
- s.register(1, selectors.EVENT_READ)
-
- s.close()
- self.assertFalse(s._fd_to_key)
+ self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)
def test_context_manager(self):
- s = FakeSelector()
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ rd, wr = self.make_socketpair()
with s as sel:
- sel.register(1, selectors.EVENT_READ)
+ sel.register(rd, selectors.EVENT_READ)
+ sel.register(wr, selectors.EVENT_WRITE)
+
+ self.assertRaises(KeyError, s.get_key, rd)
+ self.assertRaises(KeyError, s.get_key, wr)
+
+ def test_fileno(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ if hasattr(s, 'fileno'):
+ fd = s.fileno()
+ self.assertTrue(isinstance(fd, int))
+ self.assertGreaterEqual(fd, 0)
+
+ def test_selector(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ NUM_SOCKETS = 12
+ MSG = b" This is a test."
+ MSG_LEN = len(MSG)
+ readers = []
+ writers = []
+ r2w = {}
+ w2r = {}
+
+ for i in range(NUM_SOCKETS):
+ rd, wr = self.make_socketpair()
+ s.register(rd, selectors.EVENT_READ)
+ s.register(wr, selectors.EVENT_WRITE)
+ readers.append(rd)
+ writers.append(wr)
+ r2w[rd] = wr
+ w2r[wr] = rd
+
+ bufs = []
+
+ while writers:
+ ready = s.select()
+ ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)
+ if not ready_writers:
+ self.fail("no sockets ready for writing")
+ wr = random.choice(ready_writers)
+ wr.send(MSG)
+
+ for i in range(10):
+ ready = s.select()
+ ready_readers = find_ready_matching(ready,
+ selectors.EVENT_READ)
+ if ready_readers:
+ break
+ # there might be a delay between the write to the write end and
+ # the read end is reported ready
+ sleep(0.1)
+ else:
+ self.fail("no sockets ready for reading")
+ self.assertEqual([w2r[wr]], ready_readers)
+ rd = ready_readers[0]
+ buf = rd.recv(MSG_LEN)
+ self.assertEqual(len(buf), MSG_LEN)
+ bufs.append(buf)
+ s.unregister(r2w[rd])
+ s.unregister(rd)
+ writers.remove(r2w[rd])
+
+ self.assertEqual(bufs, [MSG] * NUM_SOCKETS)
+
+ @unittest.skipIf(sys.platform == 'win32',
+ 'select.select() cannot be used with empty fd sets')
+ def test_empty_select(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+ self.assertEqual(s.select(timeout=0), [])
+
+ def test_timeout(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ rd, wr = self.make_socketpair()
+
+ s.register(wr, selectors.EVENT_WRITE)
+ t = time()
+ self.assertEqual(1, len(s.select(0)))
+ self.assertEqual(1, len(s.select(-1)))
+ self.assertLess(time() - t, 0.5)
+
+ s.unregister(wr)
+ s.register(rd, selectors.EVENT_READ)
+ t = time()
+ self.assertFalse(s.select(0))
+ self.assertFalse(s.select(-1))
+ self.assertLess(time() - t, 0.5)
+
+ t0 = time()
+ self.assertFalse(s.select(1))
+ t1 = time()
+ dt = t1 - t0
+ # Tolerate 2.0 seconds for very slow buildbots
+ self.assertTrue(0.8 <= dt <= 2.0, dt)
+
+ @unittest.skipUnless(hasattr(signal, "alarm"),
+ "signal.alarm() required for this test")
+ def test_select_interrupt(self):
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ rd, wr = self.make_socketpair()
+
+ orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)
+ self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
+ self.addCleanup(signal.alarm, 0)
+
+ signal.alarm(1)
+
+ s.register(rd, selectors.EVENT_READ)
+ t = time()
+ self.assertFalse(s.select(2))
+ self.assertLess(time() - t, 2.5)
+
+
+class ScalableSelectorMixIn:
+
+ # see issue #18963 for why it's skipped on older OS X versions
+ @support.requires_mac_ver(10, 5)
+ @unittest.skipUnless(resource, "Test needs resource module")
+ def test_above_fd_setsize(self):
+ # A scalable implementation should have no problem with more than
+ # FD_SETSIZE file descriptors. Since we don't know the value, we just
+ # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.
+ soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
+ try:
+ resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))
+ self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,
+ (soft, hard))
+ NUM_FDS = min(hard, 2**16)
+ except (OSError, ValueError):
+ NUM_FDS = soft
+
+ # guard for already allocated FDs (stdin, stdout...)
+ NUM_FDS -= 32
+
+ s = self.SELECTOR()
+ self.addCleanup(s.close)
+
+ for i in range(NUM_FDS // 2):
+ try:
+ rd, wr = self.make_socketpair()
+ except OSError:
+ # too many FDs, skip - note that we should only catch EMFILE
+ # here, but apparently *BSD and Solaris can fail upon connect()
+ # or bind() with EADDRNOTAVAIL, so let's be safe
+ self.skipTest("FD limit reached")
+
+ try:
+ s.register(rd, selectors.EVENT_READ)
+ s.register(wr, selectors.EVENT_WRITE)
+ except OSError as e:
+ if e.errno == errno.ENOSPC:
+ # this can be raised by epoll if we go over
+ # fs.epoll.max_user_watches sysctl
+ self.skipTest("FD limit reached")
+ raise
+
+ self.assertEqual(NUM_FDS // 2, len(s.select()))
+
+
+class DefaultSelectorTestCase(BaseSelectorTestCase):
+
+ SELECTOR = selectors.DefaultSelector
+
+
+class SelectSelectorTestCase(BaseSelectorTestCase):
+
+ SELECTOR = selectors.SelectSelector
+
+
+@unittest.skipUnless(hasattr(selectors, 'PollSelector'),
+ "Test needs selectors.PollSelector")
+class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
+
+ SELECTOR = getattr(selectors, 'PollSelector', None)
+
+
+@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),
+ "Test needs selectors.EpollSelector")
+class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
+
+ SELECTOR = getattr(selectors, 'EpollSelector', None)
+
+
+@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
+ "Test needs selectors.KqueueSelector)")
+class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
+
+ SELECTOR = getattr(selectors, 'KqueueSelector', None)
+
+
+@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),
+ "Test needs selectors.DevpollSelector")
+class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):
- self.assertFalse(s._fd_to_key)
+ SELECTOR = getattr(selectors, 'DevpollSelector', None)
- def test_key_from_fd(self):
- s = FakeSelector()
- key = s.register(1, selectors.EVENT_READ)
- self.assertIs(key, s._key_from_fd(1))
- self.assertIsNone(s._key_from_fd(10))
- if hasattr(selectors.DefaultSelector, 'fileno'):
- def test_fileno(self):
- self.assertIsInstance(selectors.DefaultSelector().fileno(), int)
+def test_main():
+ tests = [DefaultSelectorTestCase, SelectSelectorTestCase,
+ PollSelectorTestCase, EpollSelectorTestCase,
+ KqueueSelectorTestCase, DevpollSelectorTestCase]
+ support.run_unittest(*tests)
+ support.reap_children()
-if __name__ == '__main__':
- unittest.main()
+if __name__ == "__main__":
+ test_main()