diff options
author | Roger Hu <rhu@hearsaycorp.com> | 2014-05-08 00:27:32 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-05-19 18:37:01 +0100 |
commit | 0b05298377488e411e3204d0e93bbad62b1fd68a (patch) | |
tree | ba7512b9405da8ab61a44621a21ca9bb9d79cb56 | |
parent | eae4dba4f298501a1b12119c3161912efc8edbb4 (diff) | |
download | kombu-0b05298377488e411e3204d0e93bbad62b1fd68a.tar.gz |
Be selective about how file descriptors are removed since they may be reused for a different purpose.
Kombu was removing them after they were being reused by another worker process.
-rw-r--r-- | kombu/async/hub.py | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py index 673a60ac..306fdd31 100644 --- a/kombu/async/hub.py +++ b/kombu/async/hub.py @@ -287,20 +287,26 @@ class Hub(object): to_consolidate.append(fileno) continue cb = cbargs = None - try: - if event & READ: + + if event & READ: + try: cb, cbargs = readers[fileno] - elif event & WRITE: + except KeyError: + self.remove_reader(fileno) + continue + elif event & WRITE: + try: cb, cbargs = writers[fileno] - elif event & ERR: - try: - cb, cbargs = (readers.get(fileno) or - writers.get(fileno)) - except TypeError: - pass - except (KeyError, Empty): - hub_remove(fileno) - continue + except KeyError: + self.remove_writer(fileno) + continue + elif event & ERR: + try: + cb, cbargs = (readers.get(fileno) or + writers.get(fileno)) + except TypeError: + pass + if cb is None: continue if isinstance(cb, generator): |