summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-12-10 13:11:04 -0800
committerAsk Solem <ask@celeryproject.org>2015-12-10 13:12:15 -0800
commitde3f17a4e6c8bb020993fea169e1b8b1ea1e221f (patch)
tree61ba472f72b734f34a09d69ff7bd62bf66076f43
parenta98382f8b8564efd1e3db79bc38d9c0a8f723392 (diff)
downloadkombu-de3f17a4e6c8bb020993fea169e1b8b1ea1e221f.tar.gz
Redis: Only override disconnect for async connections
-rw-r--r--kombu/transport/redis.py45
1 files changed, 30 insertions, 15 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 35e1ca9e..23278780 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -407,6 +407,8 @@ class Channel(virtual.Channel):
#: and binding keys (like a topic exchange but using PUB/SUB).
#: This will be enabled by default in a future version.
fanout_patterns = False
+
+ _async_pool = None
_pool = None
from_transport_options = (
@@ -452,8 +454,7 @@ class Channel(virtual.Channel):
try:
self.client.info()
except Exception:
- if self._pool:
- self._pool.disconnect()
+ self._disconnect_pools()
raise
self.connection.cycle.add(self) # add to channel poller.
@@ -464,14 +465,21 @@ class Channel(virtual.Channel):
register_after_fork(self, self._after_fork)
def _after_fork(self):
+ self._disconnect_pools()
+
+ def _disconnect_pools(self):
+ if self._async_pool is not None:
+ self._async_pool.disconnect()
if self._pool is not None:
self._pool.disconnect()
+ self._async_pool = self._pool = None
def _on_connection_disconnect(self, connection):
self._in_poll = False
self._in_listen = False
if self.connection and self.connection.cycle:
self.connection.cycle._on_connection_disconnect(connection)
+ self._disconnect_pools()
if not self._closing:
raise get_redis_ConnectionError()
@@ -749,8 +757,7 @@ class Channel(virtual.Channel):
def close(self):
self._closing = True
- if self._pool:
- self._pool.disconnect()
+ self._disconnect_pools()
if not self.closed:
# remove from channel poller.
self.connection.cycle.discard(self)
@@ -787,7 +794,7 @@ class Channel(virtual.Channel):
))
return vhost
- def _connparams(self):
+ def _connparams(self, async=False):
conninfo = self.connection.client
connparams = {'host': conninfo.hostname or '127.0.0.1',
'port': conninfo.port or DEFAULT_PORT,
@@ -814,19 +821,22 @@ class Channel(virtual.Channel):
redis.Connection
)
- class Connection(connection_cls):
- def disconnect(self):
- channel._on_connection_disconnect(self)
- super(Connection, self).disconnect()
- connparams['connection_class'] = Connection
+ if async:
+ class Connection(connection_cls):
+ def disconnect(self):
+ super(Connection, self).disconnect()
+ channel._on_connection_disconnect(self)
+ connparams['connection_class'] = Connection
return connparams
- def _create_client(self):
+ def _create_client(self, async=False):
+ if async:
+ return self.Client(connection_pool=self.async_pool)
return self.Client(connection_pool=self.pool)
- def _get_pool(self):
- params = self._connparams()
+ def _get_pool(self, async=False):
+ params = self._connparams(async=async)
self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db'])
return redis.ConnectionPool(**params)
@@ -867,15 +877,20 @@ class Channel(virtual.Channel):
self._pool = self._get_pool()
return self._pool
+ @property
+ def async_pool(self):
+ if self._async_pool is None:
+ self._async_pool = self._get_pool(async=True)
+
@cached_property
def client(self):
"""Client used to publish messages, BRPOP etc."""
- return self._create_client()
+ return self._create_client(async=True)
@cached_property
def subclient(self):
"""Pub/Sub connection used to consume fanout queues."""
- client = self._create_client()
+ client = self._create_client(async=True)
pubsub = client.pubsub()
pool = pubsub.connection_pool
pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint)