summaryrefslogtreecommitdiff
path: root/tests/test_wasyncore.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_wasyncore.py')
-rw-r--r--tests/test_wasyncore.py95
1 files changed, 68 insertions, 27 deletions
diff --git a/tests/test_wasyncore.py b/tests/test_wasyncore.py
index 9c23509..970e993 100644
--- a/tests/test_wasyncore.py
+++ b/tests/test_wasyncore.py
@@ -1,21 +1,21 @@
-from waitress import wasyncore as asyncore
-from waitress import compat
+import _thread as thread
import contextlib
+import errno
import functools
import gc
-import unittest
-import select
+from io import BytesIO
import os
-import socket
-import sys
-import time
-import errno
import re
+import select
+import socket
import struct
+import sys
import threading
+import time
+import unittest
import warnings
-from io import BytesIO
+from waitress import compat, wasyncore as asyncore
TIMEOUT = 3
HAS_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
@@ -24,6 +24,7 @@ HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
# Filename used for testing
+
if os.name == "java": # pragma: no cover
# Jython disallows @ in module names
TESTFN = "$test"
@@ -33,7 +34,7 @@ else:
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-class DummyLogger(object): # pragma: no cover
+class DummyLogger: # pragma: no cover
def __init__(self):
self.messages = []
@@ -41,7 +42,7 @@ class DummyLogger(object): # pragma: no cover
self.messages.append((severity, message))
-class WarningsRecorder(object): # pragma: no cover
+class WarningsRecorder: # pragma: no cover
"""Convenience wrapper for the warnings list returned on
entry to the warnings.catch_warnings() context manager.
"""
@@ -67,6 +68,7 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover
# in order to re-raise the warnings.
frame = sys._getframe(2)
registry = frame.f_globals.get("__warningregistry__")
+
if registry:
registry.clear()
with warnings.catch_warnings(record=True) as w:
@@ -78,19 +80,25 @@ def _filterwarnings(filters, quiet=False): # pragma: no cover
# Filter the recorded warnings
reraise = list(w)
missing = []
+
for msg, cat in filters:
seen = False
+
for w in reraise[:]:
warning = w.message
# Filter out the matching messages
+
if re.match(msg, str(warning), re.I) and issubclass(warning.__class__, cat):
seen = True
reraise.remove(w)
+
if not seen and not quiet:
# This filter caught nothing
missing.append((msg, cat.__name__))
+
if reraise:
raise AssertionError("unhandled warning %s" % reraise[0])
+
if missing:
raise AssertionError("filter (%r, %s) did not catch any warning" % missing[0])
@@ -111,11 +119,14 @@ def check_warnings(*filters, **kwargs): # pragma: no cover
check_warnings(("", Warning), quiet=True)
"""
quiet = kwargs.get("quiet")
+
if not filters:
filters = (("", Warning),)
# Preserve backward compatibility
+
if quiet is None:
quiet = True
+
return _filterwarnings(filters, quiet)
@@ -130,6 +141,7 @@ def gc_collect(): # pragma: no cover
objects to disappear.
"""
gc.collect()
+
if sys.platform.startswith("java"):
time.sleep(0.1)
gc.collect()
@@ -137,7 +149,7 @@ def gc_collect(): # pragma: no cover
def threading_setup(): # pragma: no cover
- return (compat.thread._count(), None)
+ return (thread._count(), None)
def threading_cleanup(*original_values): # pragma: no cover
@@ -146,7 +158,8 @@ def threading_cleanup(*original_values): # pragma: no cover
_MAX_COUNT = 100
for count in range(_MAX_COUNT):
- values = (compat.thread._count(), None)
+ values = (thread._count(), None)
+
if values == original_values:
break
@@ -186,6 +199,7 @@ def join_thread(thread, timeout=30.0): # pragma: no cover
after timeout seconds.
"""
thread.join(timeout)
+
if thread.is_alive():
msg = "failed to join the thread in %.1f seconds" % timeout
raise AssertionError(msg)
@@ -213,6 +227,7 @@ def bind_port(sock, host=HOST): # pragma: no cover
"tests should never set the SO_REUSEADDR "
"socket option on TCP/IP sockets!"
)
+
if hasattr(socket, "SO_REUSEPORT"):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
@@ -225,11 +240,13 @@ def bind_port(sock, host=HOST): # pragma: no cover
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
+
if hasattr(socket, "SO_EXCLUSIVEADDRUSE"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
+
return port
@@ -303,13 +320,16 @@ def capture_server(evt, buf, serv): # pragma no cover
else:
n = 200
start = time.time()
+
while n > 0 and time.time() - start < 3.0:
r, w, e = select.select([conn], [], [], 0.1)
+
if r:
n -= 1
data = conn.recv(10)
# keep everything except for the newline terminator
buf.write(data.replace(b"\n", b""))
+
if b"\n" in data:
break
time.sleep(0.01)
@@ -332,6 +352,7 @@ def bind_unix_socket(sock, addr): # pragma: no cover
def bind_af_aware(sock, addr):
"""Helper function to bind a socket according to its family."""
+
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.
unlink(addr)
@@ -346,6 +367,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Perform the operation
func(pathname)
# Now setup the wait loop
+
if waitall:
dirname = pathname
else:
@@ -358,6 +380,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
+
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
@@ -367,6 +390,7 @@ if sys.platform.startswith("win"): # pragma: no cover
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
+
if not (L if waitall else name in L):
return
# Increase the timeout and try again
@@ -395,17 +419,20 @@ def unlink(filename):
def _is_ipv6_enabled(): # pragma: no cover
"""Check whether IPv6 is enabled on this host."""
+
if compat.HAS_IPV6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind(("::1", 0))
+
return True
- except socket.error:
+ except OSError:
pass
finally:
if sock:
sock.close()
+
return False
@@ -487,6 +514,7 @@ class HelperFunctionTests(unittest.TestCase):
# Only the attribute modified by the routine we expect to be
# called should be True.
+
for attr in attributes:
self.assertEqual(getattr(tobj, attr), attr == expectedattr)
@@ -513,6 +541,7 @@ class HelperFunctionTests(unittest.TestCase):
l = []
testmap = {}
+
for i in range(10):
c = dummychannel()
l.append(c)
@@ -606,6 +635,7 @@ class DispatcherTests(unittest.TestCase):
def test_strerror(self):
# refers to bug #8573
err = asyncore._strerror(errno.EPERM)
+
if hasattr(os, "strerror"):
self.assertEqual(err, os.strerror(errno.EPERM))
err = asyncore._strerror(-1)
@@ -656,6 +686,7 @@ class DispatcherWithSendTests(unittest.TestCase):
d.send(b"\n")
n = 1000
+
while d.out_buffer and n > 0: # pragma: no cover
asyncore.poll()
n -= 1
@@ -723,6 +754,7 @@ class FileWrapperTest(unittest.TestCase):
def test_resource_warning(self):
# Issue #11453
got_warning = False
+
while got_warning is False:
# we try until we get the outcome we want because this
# test is not deterministic (gc_collect() may not
@@ -732,7 +764,7 @@ class FileWrapperTest(unittest.TestCase):
os.close(fd)
try:
- with check_warnings(("", compat.ResourceWarning)):
+ with check_warnings(("", ResourceWarning)):
f = None
gc_collect()
except AssertionError: # pragma: no cover
@@ -819,8 +851,10 @@ class BaseTestAPI:
def loop_waiting_for_flag(self, instance, timeout=5): # pragma: no cover
timeout = float(timeout) / 100
count = 100
+
while asyncore.socket_map and count > 0:
asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
+
if instance.flag:
return
count -= 1
@@ -966,6 +1000,7 @@ class BaseTestAPI:
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
+
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
@@ -980,7 +1015,7 @@ class BaseTestAPI:
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
- self.socket.send(compat.tobytes(chr(244)), socket.MSG_OOB)
+ self.socket.send(chr(244).encode("latin-1"), socket.MSG_OOB)
server = BaseServer(self.family, self.addr, TestHandler)
client = TestClient(self.family, server.address)
@@ -1082,6 +1117,7 @@ class BaseTestAPI:
@reap_threads
def test_quick_connect(self): # pragma: no cover
# see: http://bugs.python.org/issue10340
+
if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
self.skipTest("test specific to AF_INET and AF_INET6")
@@ -1420,7 +1456,7 @@ class Test_dispatcher(unittest.TestCase):
sock = dummysocket()
def getpeername():
- raise socket.error(errno.EBADF)
+ raise OSError(errno.EBADF)
map = {}
sock.getpeername = getpeername
@@ -1454,7 +1490,7 @@ class Test_dispatcher(unittest.TestCase):
def setsockopt(*arg, **kw):
sock.errored = True
- raise socket.error
+ raise OSError
sock.setsockopt = setsockopt
sock.getsockopt = lambda *arg: 0
@@ -1486,7 +1522,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def accept(*arg, **kw):
- raise socket.error(122)
+ raise OSError(122)
sock.accept = accept
inst = self._makeOne(sock=sock, map=map)
@@ -1497,7 +1533,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def send(*arg, **kw):
- raise socket.error(errno.EWOULDBLOCK)
+ raise OSError(errno.EWOULDBLOCK)
sock.send = send
inst = self._makeOne(sock=sock, map=map)
@@ -1509,7 +1545,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def send(*arg, **kw):
- raise socket.error(122)
+ raise OSError(122)
sock.send = send
inst = self._makeOne(sock=sock, map=map)
@@ -1520,7 +1556,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def recv(*arg, **kw):
- raise socket.error(errno.ECONNRESET)
+ raise OSError(errno.ECONNRESET)
def handle_close():
inst.close_handled = True
@@ -1537,7 +1573,7 @@ class Test_dispatcher(unittest.TestCase):
map = {}
def close():
- raise socket.error(122)
+ raise OSError(122)
sock.close = close
inst = self._makeOne(sock=sock, map=map)
@@ -1680,7 +1716,7 @@ class Test_close_all(unittest.TestCase):
self.assertRaises(RuntimeError, self._callFUT, map)
-class DummyDispatcher(object):
+class DummyDispatcher:
read_event_handled = False
write_event_handled = False
expt_event_handled = False
@@ -1693,16 +1729,19 @@ class DummyDispatcher(object):
def handle_read_event(self):
self.read_event_handled = True
+
if self.exc is not None:
raise self.exc
def handle_write_event(self):
self.write_event_handled = True
+
if self.exc is not None:
raise self.exc
def handle_expt_event(self):
self.expt_event_handled = True
+
if self.exc is not None:
raise self.exc
@@ -1723,7 +1762,7 @@ class DummyDispatcher(object):
raise self.exc
-class DummyTime(object):
+class DummyTime:
def __init__(self):
self.sleepvals = []
@@ -1731,7 +1770,7 @@ class DummyTime(object):
self.sleepvals.append(val)
-class DummySelect(object):
+class DummySelect:
error = select.error
def __init__(self, exc=None, pollster=None):
@@ -1741,6 +1780,7 @@ class DummySelect(object):
def select(self, *arg):
self.selected.append(arg)
+
if self.exc is not None:
raise self.exc
@@ -1748,13 +1788,14 @@ class DummySelect(object):
return self.pollster
-class DummyPollster(object):
+class DummyPollster:
def __init__(self, exc=None):
self.polled = []
self.exc = exc
def poll(self, timeout):
self.polled.append(timeout)
+
if self.exc is not None:
raise self.exc
else: # pragma: no cover