summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-11-02 10:57:17 -0800
committerAsk Solem <ask@celeryproject.org>2015-11-02 10:57:17 -0800
commit5fa2792c82c60ec0da3f2d64657304daa4e50f10 (patch)
treed2d7edbdfa5569c0fe3815970cb6923105cf71de
parent991238c960284dba3f690317366134fe772b36cf (diff)
downloadkombu-5fa2792c82c60ec0da3f2d64657304daa4e50f10.tar.gz
Redis: Verify fanout client is registered. Closes #534
-rw-r--r--kombu/tests/transport/test_redis.py1
-rw-r--r--kombu/transport/redis.py10
2 files changed, 7 insertions, 4 deletions
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py
index d2b07f85..4b72e1be 100644
--- a/kombu/tests/transport/test_redis.py
+++ b/kombu/tests/transport/test_redis.py
@@ -1138,6 +1138,7 @@ class test_MultiChannelPoller(Case):
self.assertEqual(channel._subscribe.call_count, 1)
channel._in_listen = True
+ p._chan_to_sock[(channel, channel.subclient, 'LISTEN')] = 3
channel.subclient.connection._sock = Mock()
p._register_LISTEN(channel)
self.assertEqual(p._register.call_count, 1)
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 42e4b999..cca559f9 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -275,20 +275,22 @@ class MultiChannelPoller(object):
def _unregister(self, channel, client, type):
self.poller.unregister(self._chan_to_sock[(channel, client, type)])
+ def _client_registered(self, channel, client, cmd):
+ return (client.connection._sock is not None and
+ (channel, client, cmd) in self._chan_to_sock)
+
def _register_BRPOP(self, channel):
"""enable BRPOP mode for channel."""
ident = channel, channel.client, 'BRPOP'
- if channel.client.connection._sock is None or \
- ident not in self._chan_to_sock:
+ if not self._client_registered(channel, channel.client, 'BRPOP'):
channel._in_poll = False
self._register(*ident)
-
if not channel._in_poll: # send BRPOP
channel._brpop_start()
def _register_LISTEN(self, channel):
"""enable LISTEN mode for channel."""
- if channel.subclient.connection._sock is None:
+ if not self._client_registered(channel, channel.subclient, 'LISTEN'):
channel._in_listen = False
self._register(channel, channel.subclient, 'LISTEN')
if not channel._in_listen: