summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Hu <rhu@hearsaycorp.com>2014-05-08 00:27:32 +0000
committerAsk Solem <ask@celeryproject.org>2014-05-19 18:37:01 +0100
commit0b05298377488e411e3204d0e93bbad62b1fd68a (patch)
treeba7512b9405da8ab61a44621a21ca9bb9d79cb56
parenteae4dba4f298501a1b12119c3161912efc8edbb4 (diff)
downloadkombu-0b05298377488e411e3204d0e93bbad62b1fd68a.tar.gz
Be selective about how file descriptors are removed since they may be reused for a different purpose.
Kombu was removing them after they were being reused by another worker process.
-rw-r--r--kombu/async/hub.py30
1 files changed, 18 insertions, 12 deletions
diff --git a/kombu/async/hub.py b/kombu/async/hub.py
index 673a60ac..306fdd31 100644
--- a/kombu/async/hub.py
+++ b/kombu/async/hub.py
@@ -287,20 +287,26 @@ class Hub(object):
to_consolidate.append(fileno)
continue
cb = cbargs = None
- try:
- if event & READ:
+
+ if event & READ:
+ try:
cb, cbargs = readers[fileno]
- elif event & WRITE:
+ except KeyError:
+ self.remove_reader(fileno)
+ continue
+ elif event & WRITE:
+ try:
cb, cbargs = writers[fileno]
- elif event & ERR:
- try:
- cb, cbargs = (readers.get(fileno) or
- writers.get(fileno))
- except TypeError:
- pass
- except (KeyError, Empty):
- hub_remove(fileno)
- continue
+ except KeyError:
+ self.remove_writer(fileno)
+ continue
+ elif event & ERR:
+ try:
+ cb, cbargs = (readers.get(fileno) or
+ writers.get(fileno))
+ except TypeError:
+ pass
+
if cb is None:
continue
if isinstance(cb, generator):