summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2013-07-02 18:29:48 +0400
committerSergey Shepelev <temotor@gmail.com>2013-07-02 18:29:48 +0400
commitb87ed20e8c45e4498d00f730586b69795496787f (patch)
treed704e9822655bfca6398e5aa910e539a938b498e
parent747b753a2018b14a63511c1d456c95f05dcc2d9e (diff)
downloadeventlet-b87ed20e8c45e4498d00f730586b69795496787f.tar.gz
hubs: get_default_hub() on Windows broken by kqueue; Thanks to Paul Oppenheim
Fixes https://github.com/eventlet/eventlet/issues/38 +autopep8
-rw-r--r--eventlet/hubs/__init__.py16
-rw-r--r--eventlet/hubs/kqueue.py8
-rw-r--r--tests/hub_test.py110
3 files changed, 99 insertions, 35 deletions
diff --git a/eventlet/hubs/__init__.py b/eventlet/hubs/__init__.py
index bdc6f3c..9b4c2b7 100644
--- a/eventlet/hubs/__init__.py
+++ b/eventlet/hubs/__init__.py
@@ -16,12 +16,13 @@ __all__ = ["use_hub", "get_hub", "get_default_hub", "trampoline"]
threading = patcher.original('threading')
_threadlocal = threading.local()
+
def get_default_hub():
"""Select the default hub implementation based on what multiplexing
libraries are installed. The order that the hubs are tried is:
- * twistedr
* epoll
+ * kqueue
* poll
* select
@@ -33,10 +34,10 @@ def get_default_hub():
"""
# pyevent hub disabled for now because it is not thread-safe
- #try:
+ # try:
# import eventlet.hubs.pyevent
# return eventlet.hubs.pyevent
- #except:
+ # except:
# pass
select = patcher.original('select')
@@ -91,12 +92,14 @@ def use_hub(mod=None):
mod, found = entry.load(), True
break
if not found:
- mod = __import__('eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
+ mod = __import__(
+ 'eventlet.hubs.' + mod, globals(), locals(), ['Hub'])
if hasattr(mod, 'Hub'):
_threadlocal.Hub = mod.Hub
else:
_threadlocal.Hub = mod
+
def get_hub():
"""Get the current event hub singleton object.
@@ -113,6 +116,8 @@ def get_hub():
return hub
from eventlet import timeout
+
+
def trampoline(fd, read=None, write=None, timeout=None,
timeout_exc=timeout.Timeout):
"""Suspend the current coroutine until the given socket object or file
@@ -133,7 +138,8 @@ def trampoline(fd, read=None, write=None, timeout=None,
hub = get_hub()
current = greenlet.getcurrent()
assert hub.greenlet is not current, 'do not call blocking functions from the mainloop'
- assert not (read and write), 'not allowed to trampoline for reading and writing'
+ assert not (
+ read and write), 'not allowed to trampoline for reading and writing'
try:
fileno = fd.fileno()
except AttributeError:
diff --git a/eventlet/hubs/kqueue.py b/eventlet/hubs/kqueue.py
index b24df83..9fbc9e2 100644
--- a/eventlet/hubs/kqueue.py
+++ b/eventlet/hubs/kqueue.py
@@ -8,9 +8,15 @@ sleep = time.sleep
from eventlet.support import get_errno, clear_sys_exc_info
from eventlet.hubs.hub import BaseHub, READ, WRITE, noop
+
+if getattr(select, 'kqueue', None) is None:
+ raise ImportError('No kqueue implementation found in select module')
+
+
FILTERS = {READ: select.KQ_FILTER_READ,
WRITE: select.KQ_FILTER_WRITE}
+
class Hub(BaseHub):
MAX_EVENTS = 100
@@ -47,7 +53,7 @@ class Hub(BaseHub):
if evtype not in events:
try:
event = select.kevent(fileno,
- FILTERS.get(evtype), select.KQ_EV_ADD)
+ FILTERS.get(evtype), select.KQ_EV_ADD)
self._control([event], 0, 0)
events[evtype] = event
except ValueError:
diff --git a/tests/hub_test.py b/tests/hub_test.py
index 6fd1e3b..d062304 100644
--- a/tests/hub_test.py
+++ b/tests/hub_test.py
@@ -2,6 +2,7 @@ from __future__ import with_statement
import sys
from tests import LimitedTestCase, main, skip_with_pyevent, skip_if_no_itimer, skip_unless
+from tests.patcher_test import ProcessBase
import time
import eventlet
from eventlet import hubs
@@ -11,11 +12,15 @@ from eventlet.semaphore import Semaphore
from eventlet.support import greenlets
DELAY = 0.001
+
+
def noop():
pass
+
class TestTimerCleanup(LimitedTestCase):
TEST_TIMEOUT = 2
+
@skip_with_pyevent
def test_cancel_immediate(self):
hub = hubs.get_hub()
@@ -25,12 +30,11 @@ class TestTimerCleanup(LimitedTestCase):
t = hubs.get_hub().schedule_call_global(60, noop)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
- hub.get_timers_count() + 1)
+ hub.get_timers_count() + 1)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
-
@skip_with_pyevent
def test_cancel_accumulated(self):
hub = hubs.get_hub()
@@ -40,10 +44,10 @@ class TestTimerCleanup(LimitedTestCase):
t = hubs.get_hub().schedule_call_global(60, noop)
eventlet.sleep()
self.assert_less_than_equal(hub.timers_canceled,
- hub.get_timers_count() + 1)
+ hub.get_timers_count() + 1)
t.cancel()
self.assert_less_than_equal(hub.timers_canceled,
- hub.get_timers_count() + 1, hub.timers)
+ hub.get_timers_count() + 1, hub.timers)
# there should be fewer than 1000 new timers and canceled
self.assert_less_than_equal(hub.get_timers_count(), 1000 + stimers)
self.assert_less_than_equal(hub.timers_canceled, 1000)
@@ -78,34 +82,36 @@ class TestTimerCleanup(LimitedTestCase):
self.assert_less_than_equal(hub.timers_canceled,
hub.get_timers_count())
eventlet.sleep()
-
+
class TestScheduleCall(LimitedTestCase):
+
def test_local(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_local, DELAY, lst.pop)
eventlet.sleep(0)
- eventlet.sleep(DELAY*2)
+ eventlet.sleep(DELAY * 2)
assert lst == [1], lst
def test_global(self):
lst = [1]
eventlet.spawn(hubs.get_hub().schedule_call_global, DELAY, lst.pop)
eventlet.sleep(0)
- eventlet.sleep(DELAY*2)
+ eventlet.sleep(DELAY * 2)
assert lst == [], lst
-
+
def test_ordering(self):
lst = []
- hubs.get_hub().schedule_call_global(DELAY*2, lst.append, 3)
+ hubs.get_hub().schedule_call_global(DELAY * 2, lst.append, 3)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 1)
hubs.get_hub().schedule_call_global(DELAY, lst.append, 2)
while len(lst) < 3:
eventlet.sleep(DELAY)
- self.assertEquals(lst, [1,2,3])
+ self.assertEquals(lst, [1, 2, 3])
+
-
class TestDebug(LimitedTestCase):
+
def test_debug_listeners(self):
hubs.get_hub().set_debug_listeners(True)
hubs.get_hub().set_debug_listeners(False)
@@ -113,19 +119,23 @@ class TestDebug(LimitedTestCase):
def test_timer_exceptions(self):
hubs.get_hub().set_timer_exceptions(True)
hubs.get_hub().set_timer_exceptions(False)
-
+
class TestExceptionInMainloop(LimitedTestCase):
+
def test_sleep(self):
- # even if there was an error in the mainloop, the hub should continue to work
+ # even if there was an error in the mainloop, the hub should continue
+ # to work
start = time.time()
eventlet.sleep(DELAY)
delay = time.time() - start
- assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
+ assert delay >= DELAY * \
+ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
+ delay, DELAY)
def fail():
- 1//0
+ 1 // 0
hubs.get_hub().schedule_call_global(0, fail)
@@ -133,10 +143,13 @@ class TestExceptionInMainloop(LimitedTestCase):
eventlet.sleep(DELAY)
delay = time.time() - start
- assert delay >= DELAY*0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (delay, DELAY)
+ assert delay >= DELAY * \
+ 0.9, 'sleep returned after %s seconds (was scheduled for %s)' % (
+ delay, DELAY)
class TestExceptionInGreenthread(LimitedTestCase):
+
@skip_unless(greenlets.preserves_excinfo)
def test_exceptionpreservation(self):
# events for controlling execution order
@@ -193,6 +206,7 @@ class TestExceptionInGreenthread(LimitedTestCase):
class TestHubSelection(LimitedTestCase):
+
def test_explicit_hub(self):
if getattr(hubs.get_hub(), 'uses_twisted_reactor', None):
# doesn't work with twisted
@@ -207,6 +221,7 @@ class TestHubSelection(LimitedTestCase):
class TestHubBlockingDetector(LimitedTestCase):
TEST_TIMEOUT = 10
+
@skip_with_pyevent
def test_block_detect(self):
def look_im_blocking():
@@ -230,10 +245,11 @@ class TestHubBlockingDetector(LimitedTestCase):
gt = eventlet.spawn(look_im_blocking)
self.assertRaises(RuntimeError, gt.wait)
debug.hub_blocking_detection(False)
-
+
class TestSuspend(LimitedTestCase):
- TEST_TIMEOUT=3
+ TEST_TIMEOUT = 3
+
def test_suspend_doesnt_crash(self):
import errno
import os
@@ -256,11 +272,11 @@ except eventlet.Timeout:
python_path = os.pathsep.join(sys.path + [self.tempdir])
new_env = os.environ.copy()
new_env['PYTHONPATH'] = python_path
- p = subprocess.Popen([sys.executable,
+ p = subprocess.Popen([sys.executable,
os.path.join(self.tempdir, filename)],
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=new_env)
eventlet.sleep(0.4) # wait for process to hit accept
- os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
+ os.kill(p.pid, signal.SIGSTOP) # suspend and resume to generate EINTR
os.kill(p.pid, signal.SIGCONT)
output, _ = p.communicate()
lines = [l for l in output.split("\n") if l]
@@ -269,15 +285,16 @@ except eventlet.Timeout:
class TestBadFilenos(LimitedTestCase):
+
@skip_with_pyevent
def test_repeated_selects(self):
from eventlet.green import select
self.assertRaises(ValueError, select.select, [-1], [], [])
self.assertRaises(ValueError, select.select, [-1], [], [])
-
-from tests.patcher_test import ProcessBase
+
class TestFork(ProcessBase):
+
@skip_with_pyevent
def test_fork(self):
new_mod = """
@@ -297,7 +314,7 @@ if not pid:
new_sock, address = server.accept()
except eventlet.Timeout, t:
print "accept blocked"
-
+
else:
kpid, status = os.wait()
assert kpid == pid
@@ -312,7 +329,7 @@ else:
class TestDeadRunLoop(LimitedTestCase):
- TEST_TIMEOUT=2
+ TEST_TIMEOUT = 2
class CustomException(Exception):
pass
@@ -330,7 +347,7 @@ class TestDeadRunLoop(LimitedTestCase):
eventlet.sleep(0) # let dummyproc run
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
- KeyboardInterrupt())
+ KeyboardInterrupt())
# kill dummyproc, this schedules a timer to return execution to
# this greenlet before throwing an exception in dummyproc.
@@ -355,7 +372,7 @@ class TestDeadRunLoop(LimitedTestCase):
g = eventlet.spawn(dummyproc)
assert hub.greenlet.parent == eventlet.greenthread.getcurrent()
self.assertRaises(KeyboardInterrupt, hub.greenlet.throw,
- KeyboardInterrupt())
+ KeyboardInterrupt())
assert not g.dead # check dummyproc hasn't completed
with eventlet.Timeout(0.5, self.CustomException()):
@@ -370,6 +387,41 @@ class TestDeadRunLoop(LimitedTestCase):
class Foo(object):
pass
-if __name__=='__main__':
- main()
+class TestDefaultHub(ProcessBase):
+
+ def test_kqueue_unsupported(self):
+ # https://github.com/eventlet/eventlet/issues/38
+ # get_hub on windows broken by kqueue
+ module_source = r'''
+# Simulate absence of kqueue even on platforms that support it.
+import select
+try:
+ del select.kqueue
+except AttributeError:
+ pass
+
+import __builtin__
+original_import = __builtin__.__import__
+
+def fail_import(name, *args, **kwargs):
+ if 'epoll' in name:
+ raise ImportError('disabled for test')
+ if 'kqueue' in name:
+ print('kqueue tried')
+ return original_import(name, *args, **kwargs)
+
+__builtin__.__import__ = fail_import
+
+
+import eventlet.hubs
+eventlet.hubs.get_default_hub()
+print('ok')
+'''
+ self.write_to_tempfile('newmod', module_source)
+ output, _ = self.launch_subprocess('newmod.py')
+ self.assertEqual(output, 'kqueue tried\nok\n')
+
+
+if __name__ == '__main__':
+ main()