summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-12-02 14:52:25 -0800
committerAsk Solem <ask@celeryproject.org>2015-12-07 00:27:54 -0800
commit09e46c99342492ef3161cb6b803a73406d41e88c (patch)
treee639446ebb73f69dfae6654ce69f9e5c792c3fb4
parentb85b811fae0084e30bbfce9e41c7ea872d8b7718 (diff)
downloadkombu-09e46c99342492ef3161cb6b803a73406d41e88c.tar.gz
Hub: Make sure fd is unregistered if register fails
-rw-r--r--kombu/async/hub.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 6c632fd4..04cd6745 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -146,12 +146,18 @@ class Hub(object):
logger.error('Error in timer: %r', exc, exc_info=1)
return min(delay or min_delay, max_delay)
+ def _remove_from_loop(self, fd):
+ try:
+ self._unregister(fd)
+ finally:
+ self._discard(fd)
+
def add(self, fd, callback, flags, args=(), consolidate=False):
fd = fileno(fd)
try:
self.poller.register(fd, flags)
except ValueError:
- self._discard(fd)
+ self._remove_from_loop(fd)
raise
else:
dest = self.readers if flags & READ else self.writers
@@ -163,8 +169,7 @@ class Hub(object):
def remove(self, fd):
fd = fileno(fd)
- self._unregister(fd)
- self._discard(fd)
+ self._remove_from_loop(fd)
def run_forever(self):
self._running = True
@@ -207,8 +212,7 @@ class Hub(object):
writable = fd in self.writers
on_write = self.writers.get(fd)
try:
- self._unregister(fd)
- self._discard(fd)
+ self._remove_from_loop(fd)
finally:
if writable:
cb, args = on_write
@@ -218,8 +222,7 @@ class Hub(object):
readable = fd in self.readers
on_read = self.readers.get(fd)
try:
- self._unregister(fd)
- self._discard(fd)
+ self._remove_from_loop(fd)
finally:
if readable:
cb, args = on_read
@@ -306,6 +309,7 @@ class Hub(object):
pass
if cb is None:
+ self.remove(fd)
continue
if isinstance(cb, generator):
try: