diff options
author | Ask Solem <ask@celeryproject.org> | 2015-12-02 14:52:25 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-12-07 00:27:54 -0800 |
commit | 09e46c99342492ef3161cb6b803a73406d41e88c (patch) | |
tree | e639446ebb73f69dfae6654ce69f9e5c792c3fb4 | |
parent | b85b811fae0084e30bbfce9e41c7ea872d8b7718 (diff) | |
download | kombu-09e46c99342492ef3161cb6b803a73406d41e88c.tar.gz |
Hub: Make sure fd is unregistered if register fails
-rw-r--r-- | kombu/async/hub.py | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 6c632fd4..04cd6745 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -146,12 +146,18 @@ class Hub(object): logger.error('Error in timer: %r', exc, exc_info=1) return min(delay or min_delay, max_delay) + def _remove_from_loop(self, fd): + try: + self._unregister(fd) + finally: + self._discard(fd) + def add(self, fd, callback, flags, args=(), consolidate=False): fd = fileno(fd) try: self.poller.register(fd, flags) except ValueError: - self._discard(fd) + self._remove_from_loop(fd) raise else: dest = self.readers if flags & READ else self.writers @@ -163,8 +169,7 @@ class Hub(object): def remove(self, fd): fd = fileno(fd) - self._unregister(fd) - self._discard(fd) + self._remove_from_loop(fd) def run_forever(self): self._running = True @@ -207,8 +212,7 @@ class Hub(object): writable = fd in self.writers on_write = self.writers.get(fd) try: - self._unregister(fd) - self._discard(fd) + self._remove_from_loop(fd) finally: if writable: cb, args = on_write @@ -218,8 +222,7 @@ class Hub(object): readable = fd in self.readers on_read = self.readers.get(fd) try: - self._unregister(fd) - self._discard(fd) + self._remove_from_loop(fd) finally: if readable: cb, args = on_read @@ -306,6 +309,7 @@ class Hub(object): pass if cb is None: + self.remove(fd) continue if isinstance(cb, generator): try: |