summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kombu/exceptions.py4
-rw-r--r--kombu/transport/pypika.py7
-rw-r--r--kombu/transport/pyredis.py15
3 files changed, 20 insertions, 6 deletions
diff --git a/kombu/exceptions.py b/kombu/exceptions.py
index 43c7fd85..bac4ac86 100644
--- a/kombu/exceptions.py
+++ b/kombu/exceptions.py
@@ -42,3 +42,7 @@ class ChannelLimitExceeded(LimitExceeded):
class StdChannelError(Exception):
pass
+
+
+class VersionMismatch(Exception):
+ pass
diff --git a/kombu/transport/pypika.py b/kombu/transport/pypika.py
index efb09e7d..4cf84a7e 100644
--- a/kombu/transport/pypika.py
+++ b/kombu/transport/pypika.py
@@ -10,18 +10,19 @@ Pika transport.
"""
import socket
+from kombu.exceptions import VersionMismatch
+from kombu.transport import base
+
from pika import channel # must be here to raise importerror for below.
try:
from pika import asyncore_adapter
except ImportError:
- raise ImportError("Kombu only works with pika version 0.5.2")
+ raise VersionMismatch("Kombu only works with pika version 0.5.2")
from pika import blocking_adapter
from pika import connection
from pika import exceptions
from pika.spec import Basic, BasicProperties
-from kombu.transport import base
-
DEFAULT_PORT = 5672
diff --git a/kombu/transport/pyredis.py b/kombu/transport/pyredis.py
index 692fb280..36b9d073 100644
--- a/kombu/transport/pyredis.py
+++ b/kombu/transport/pyredis.py
@@ -13,6 +13,7 @@ from Queue import Empty
from anyjson import serialize, deserialize
+from kombu.exceptions import VersionMismatch
from kombu.transport import virtual
from kombu.utils import eventio
from kombu.utils import cached_property
@@ -307,11 +308,19 @@ class Channel(virtual.Channel):
password=conninfo.password)
def _get_client(self):
- from redis import Redis, ConnectionError
+ import redis
+
+ version = getattr(redis, "__version__", (0, 0, 0))
+ if version:
+ version = tuple(version.split("."))
+ if version < (2, 4, 4):
+ raise VersionMismatch(
+ "Redis transport requires redis-py versions 2.4.4 or later. "
+ "You have %r" % (".".join(version), ))
# KombuRedis maintains a connection attribute on it's instance and
# uses that when executing commands
- class KombuRedis(Redis):
+ class KombuRedis(redis.Redis):
def __init__(self, *args, **kwargs):
super(KombuRedis, self).__init__(*args, **kwargs)
self.connection = self.connection_pool.get_connection('_')
@@ -322,7 +331,7 @@ class Channel(virtual.Channel):
try:
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)
- except ConnectionError:
+ except redis.ConnectionError:
conn.disconnect()
conn.send_command(*args)
return self.parse_response(conn, command_name, **options)