summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2018-06-07 20:45:09 -0700
committerClay Gerrard <clay.gerrard@gmail.com>2020-09-23 10:16:27 -0500
commit9f49f0bbb9c18d7478d7c5e2e677ae6452845ab5 (patch)
tree6ca30c45eef826902b315970027f1ac330c104fc
parent004cc4e6c838c5c218d3a5adb8043bb2afd1cb94 (diff)
downloadeventlet-9f49f0bbb9c18d7478d7c5e2e677ae6452845ab5.tar.gz
Always remove the right listener from the hub
When in hubs.trampoline(fd, ...), a greenthread registers itself as a listener for fd, switches to the hub, and then calls hub.remove(listener) to deregister itself. hub.remove(listener) removes the primary listener. If the greenthread awoke because its fd became ready, then it is the primary listener, and everything is fine. However, if the greenthread was a secondary listener and awoke because a Timeout fired then it would remove the primary and promote a random secondary to primary. This commit makes hub.remove(listener) check to make sure listener is the primary, and if it's not, remove the listener from the secondaries.
-rw-r--r--eventlet/hubs/hub.py26
-rw-r--r--tests/hub_test.py47
2 files changed, 63 insertions, 10 deletions
diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py
index 8871082..7c356f2 100644
--- a/eventlet/hubs/hub.py
+++ b/eventlet/hubs/hub.py
@@ -225,15 +225,23 @@ class BaseHub(object):
fileno = listener.fileno
evtype = listener.evtype
- self.listeners[evtype].pop(fileno, None)
- # migrate a secondary listener to be the primary listener
- if fileno in self.secondaries[evtype]:
- sec = self.secondaries[evtype].get(fileno, None)
- if not sec:
- return
- self.listeners[evtype][fileno] = sec.pop(0)
- if not sec:
- del self.secondaries[evtype][fileno]
+ if listener is self.listeners[evtype].get(fileno):
+ self.listeners[evtype].pop(fileno, None)
+ # migrate a secondary listener to be the primary listener
+ if fileno in self.secondaries[evtype]:
+ sec = self.secondaries[evtype].get(fileno, None)
+ if not sec:
+ return
+ self.listeners[evtype][fileno] = sec.pop(0)
+ if not sec:
+ del self.secondaries[evtype][fileno]
+ else:
+ sec = [l for l in self.secondaries[evtype].get(fileno, [])
+ if l is not listener]
+ if sec:
+ self.secondaries[evtype][fileno] = sec
+ else:
+ self.secondaries[evtype].pop(fileno, None)
def mark_as_reopened(self, fileno):
""" If a file descriptor is returned by the OS as the result of some
diff --git a/tests/hub_test.py b/tests/hub_test.py
index d62b805..8b9bbe9 100644
--- a/tests/hub_test.py
+++ b/tests/hub_test.py
@@ -1,11 +1,14 @@
from __future__ import with_statement
+import errno
+import fcntl
+import os
import sys
import time
import tests
from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless
import eventlet
-from eventlet import hubs
+from eventlet import debug, hubs
from eventlet.support import greenlets
import six
@@ -83,6 +86,48 @@ class TestTimerCleanup(tests.LimitedTestCase):
eventlet.sleep()
+class TestMultipleListenersCleanup(tests.LimitedTestCase):
+ def setUp(self):
+ super(TestMultipleListenersCleanup, self).setUp()
+ debug.hub_prevent_multiple_readers(False)
+ debug.hub_exceptions(False)
+
+ def tearDown(self):
+ super(TestMultipleListenersCleanup, self).tearDown()
+ debug.hub_prevent_multiple_readers(True)
+ debug.hub_exceptions(True)
+
+ def test_cleanup(self):
+ r, w = os.pipe()
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+
+ fcntl.fcntl(r, fcntl.F_SETFL,
+ fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+ def readfd(fd):
+ while True:
+ try:
+ return os.read(fd, 1)
+ except OSError as e:
+ if e.errno != errno.EAGAIN:
+ raise
+ hubs.trampoline(fd, read=True)
+
+ first_listener = eventlet.spawn(readfd, r)
+ eventlet.sleep()
+
+ second_listener = eventlet.spawn(readfd, r)
+ eventlet.sleep()
+
+ hubs.get_hub().schedule_call_global(0, second_listener.throw,
+ eventlet.Timeout(None))
+ eventlet.sleep()
+
+ os.write(w, b'.')
+ self.assertEqual(first_listener.wait(), b'.')
+
+
class TestScheduleCall(tests.LimitedTestCase):
def test_local(self):