diff options
author | Ask Solem <ask@celeryproject.org> | 2015-11-02 10:57:17 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-11-02 10:57:17 -0800 |
commit | 5fa2792c82c60ec0da3f2d64657304daa4e50f10 (patch) | |
tree | d2d7edbdfa5569c0fe3815970cb6923105cf71de | |
parent | 991238c960284dba3f690317366134fe772b36cf (diff) | |
download | kombu-5fa2792c82c60ec0da3f2d64657304daa4e50f10.tar.gz |
Redis: Verify fanout client is registered. Closes #534
-rw-r--r-- | kombu/tests/transport/test_redis.py | 1 | ||||
-rw-r--r-- | kombu/transport/redis.py | 10 |
2 files changed, 7 insertions, 4 deletions
diff --git a/kombu/tests/transport/test_redis.py b/kombu/tests/transport/test_redis.py index d2b07f85..4b72e1be 100644 --- a/kombu/tests/transport/test_redis.py +++ b/kombu/tests/transport/test_redis.py @@ -1138,6 +1138,7 @@ class test_MultiChannelPoller(Case): self.assertEqual(channel._subscribe.call_count, 1) channel._in_listen = True + p._chan_to_sock[(channel, channel.subclient, 'LISTEN')] = 3 channel.subclient.connection._sock = Mock() p._register_LISTEN(channel) self.assertEqual(p._register.call_count, 1) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 42e4b999..cca559f9 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -275,20 +275,22 @@ class MultiChannelPoller(object): def _unregister(self, channel, client, type): self.poller.unregister(self._chan_to_sock[(channel, client, type)]) + def _client_registered(self, channel, client, cmd): + return (client.connection._sock is not None and + (channel, client, cmd) in self._chan_to_sock) + def _register_BRPOP(self, channel): """enable BRPOP mode for channel.""" ident = channel, channel.client, 'BRPOP' - if channel.client.connection._sock is None or \ - ident not in self._chan_to_sock: + if not self._client_registered(channel, channel.client, 'BRPOP'): channel._in_poll = False self._register(*ident) - if not channel._in_poll: # send BRPOP channel._brpop_start() def _register_LISTEN(self, channel): """enable LISTEN mode for channel.""" - if channel.subclient.connection._sock is None: + if not self._client_registered(channel, channel.subclient, 'LISTEN'): channel._in_listen = False self._register(channel, channel.subclient, 'LISTEN') if not channel._in_listen: |