diff options
author | Samuel Merritt <sam@swiftstack.com> | 2018-06-07 20:45:09 -0700 |
---|---|---|
committer | Clay Gerrard <clay.gerrard@gmail.com> | 2020-09-23 10:16:27 -0500 |
commit | 9f49f0bbb9c18d7478d7c5e2e677ae6452845ab5 (patch) | |
tree | 6ca30c45eef826902b315970027f1ac330c104fc | |
parent | 004cc4e6c838c5c218d3a5adb8043bb2afd1cb94 (diff) | |
download | eventlet-9f49f0bbb9c18d7478d7c5e2e677ae6452845ab5.tar.gz |
Always remove the right listener from the hub
When in hubs.trampoline(fd, ...), a greenthread registers itself as a
listener for fd, switches to the hub, and then calls
hub.remove(listener) to deregister itself. hub.remove(listener)
removes the primary listener. If the greenthread awoke because its fd
became ready, then it is the primary listener, and everything is
fine. However, if the greenthread was a secondary listener and awoke
because a Timeout fired then it would remove the primary and promote a
random secondary to primary.
This commit makes hub.remove(listener) check to make sure listener is
the primary, and if it's not, remove the listener from the
secondaries.
-rw-r--r-- | eventlet/hubs/hub.py | 26 | ||||
-rw-r--r-- | tests/hub_test.py | 47 |
2 files changed, 63 insertions, 10 deletions
diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 8871082..7c356f2 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -225,15 +225,23 @@ class BaseHub(object): fileno = listener.fileno evtype = listener.evtype - self.listeners[evtype].pop(fileno, None) - # migrate a secondary listener to be the primary listener - if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, None) - if not sec: - return - self.listeners[evtype][fileno] = sec.pop(0) - if not sec: - del self.secondaries[evtype][fileno] + if listener is self.listeners[evtype].get(fileno): + self.listeners[evtype].pop(fileno, None) + # migrate a secondary listener to be the primary listener + if fileno in self.secondaries[evtype]: + sec = self.secondaries[evtype].get(fileno, None) + if not sec: + return + self.listeners[evtype][fileno] = sec.pop(0) + if not sec: + del self.secondaries[evtype][fileno] + else: + sec = [l for l in self.secondaries[evtype].get(fileno, []) + if l is not listener] + if sec: + self.secondaries[evtype][fileno] = sec + else: + self.secondaries[evtype].pop(fileno, None) def mark_as_reopened(self, fileno): """ If a file descriptor is returned by the OS as the result of some diff --git a/tests/hub_test.py b/tests/hub_test.py index d62b805..8b9bbe9 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -1,11 +1,14 @@ from __future__ import with_statement +import errno +import fcntl +import os import sys import time import tests from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless import eventlet -from eventlet import hubs +from eventlet import debug, hubs from eventlet.support import greenlets import six @@ -83,6 +86,48 @@ class TestTimerCleanup(tests.LimitedTestCase): eventlet.sleep() +class TestMultipleListenersCleanup(tests.LimitedTestCase): + def setUp(self): + super(TestMultipleListenersCleanup, self).setUp() + debug.hub_prevent_multiple_readers(False) + debug.hub_exceptions(False) + + def tearDown(self): + super(TestMultipleListenersCleanup, self).tearDown() + debug.hub_prevent_multiple_readers(True) + debug.hub_exceptions(True) + + def test_cleanup(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + fcntl.fcntl(r, fcntl.F_SETFL, + fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) + + def readfd(fd): + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno != errno.EAGAIN: + raise + hubs.trampoline(fd, read=True) + + first_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + second_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + hubs.get_hub().schedule_call_global(0, second_listener.throw, + eventlet.Timeout(None)) + eventlet.sleep() + + os.write(w, b'.') + self.assertEqual(first_listener.wait(), b'.') + + class TestScheduleCall(tests.LimitedTestCase): def test_local(self): |