summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsh Berlin-Taylor <ash_github@firemirror.com>2018-11-19 17:33:50 +0000
committerOmer Katz <omer.drow@gmail.com>2018-11-19 19:33:50 +0200
commit05152da53d169371b8b87718f33a6f47e97b7ba0 (patch)
treee65312a4b2afbee858c258b2636e2f594a6dde83
parentd9de66bd4ee86a1c47f546f0a4668bd163c4a929 (diff)
downloadkombu-05152da53d169371b8b87718f33a6f47e97b7ba0.tar.gz
Support redis-py v2 and v3 (#948)
Further to #946 this fixes the underlying issue in a easy-to-upgrade way for end users, many of whom will have Redis installed via other means. By having this check here and supporting both versions concurrently it makes it easier for end users, and to use celery/kombu in projects that use Redis elsewhere. With this change it is possibly worth reverting #946
-rw-r--r--kombu/transport/redis.py9
-rw-r--r--t/unit/transport/test_redis.py27
2 files changed, 32 insertions, 4 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 57ce7b98..23c086b8 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -146,8 +146,15 @@ class QoS(virtual.QoS):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
+ # TODO: Remove this once we soley on Redis-py 3.0.0+
+ if redis.VERSION[0] >= 3:
+ # Redis-py changed the format of zadd args in v3.0.0
+ zadd_args = [{time(): delivery_tag}]
+ else:
+ zadd_args = [time(), delivery_tag]
+
with self.pipe_or_acquire() as pipe:
- pipe.zadd(self.unacked_index_key, time(), delivery_tag) \
+ pipe.zadd(self.unacked_index_key, *zadd_args) \
.hset(self.unacked_key, delivery_tag,
dumps([message._raw, EX, RK])) \
.execute()
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 29d60cda..d15de986 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -75,8 +75,15 @@ class Client(object):
def sadd(self, key, member, *args):
self.sets[key].add(member)
- def zadd(self, key, score1, member1, *args):
- self.sets[key].add(member1)
+ def zadd(self, key, *args):
+ if redis.redis.VERSION[0] >= 3:
+ (mapping,) = args
+ for item in mapping:
+ self.sets[key].add(item)
+ else:
+ # TODO: remove me when we drop support for Redis-py v2
+ (score1, member1) = args
+ self.sets[key].add(member1)
def smembers(self, key):
return self.sets.get(key, set())
@@ -840,7 +847,21 @@ class test_Redis:
def teardown(self):
self.connection.close()
- def test_publish__get(self):
+ @mock.replace_module_value(redis.redis, 'VERSION', [3, 0, 0])
+ def test_publish__get_redispyv3(self):
+ channel = self.connection.channel()
+ producer = Producer(channel, self.exchange, routing_key='test_Redis')
+ self.queue(channel).declare()
+
+ producer.publish({'hello': 'world'})
+
+ assert self.queue(channel).get().payload == {'hello': 'world'}
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+ assert self.queue(channel).get() is None
+
+ @mock.replace_module_value(redis.redis, 'VERSION', [2, 5, 10])
+ def test_publish__get_redispyv2(self):
channel = self.connection.channel()
producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()