diff options
author | Ask Solem <ask@celeryproject.org> | 2015-12-10 13:11:04 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2015-12-10 13:12:15 -0800 |
commit | de3f17a4e6c8bb020993fea169e1b8b1ea1e221f (patch) | |
tree | 61ba472f72b734f34a09d69ff7bd62bf66076f43 | |
parent | a98382f8b8564efd1e3db79bc38d9c0a8f723392 (diff) | |
download | kombu-de3f17a4e6c8bb020993fea169e1b8b1ea1e221f.tar.gz |
Redis: Only override disconnect for async connections
-rw-r--r-- | kombu/transport/redis.py | 45 |
1 files changed, 30 insertions, 15 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 35e1ca9e..23278780 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -407,6 +407,8 @@ class Channel(virtual.Channel): #: and binding keys (like a topic exchange but using PUB/SUB). #: This will be enabled by default in a future version. fanout_patterns = False + + _async_pool = None _pool = None from_transport_options = ( @@ -452,8 +454,7 @@ class Channel(virtual.Channel): try: self.client.info() except Exception: - if self._pool: - self._pool.disconnect() + self._disconnect_pools() raise self.connection.cycle.add(self) # add to channel poller. @@ -464,14 +465,21 @@ class Channel(virtual.Channel): register_after_fork(self, self._after_fork) def _after_fork(self): + self._disconnect_pools() + + def _disconnect_pools(self): + if self._async_pool is not None: + self._async_pool.disconnect() if self._pool is not None: self._pool.disconnect() + self._async_pool = self._pool = None def _on_connection_disconnect(self, connection): self._in_poll = False self._in_listen = False if self.connection and self.connection.cycle: self.connection.cycle._on_connection_disconnect(connection) + self._disconnect_pools() if not self._closing: raise get_redis_ConnectionError() @@ -749,8 +757,7 @@ class Channel(virtual.Channel): def close(self): self._closing = True - if self._pool: - self._pool.disconnect() + self._disconnect_pools() if not self.closed: # remove from channel poller. self.connection.cycle.discard(self) @@ -787,7 +794,7 @@ class Channel(virtual.Channel): )) return vhost - def _connparams(self): + def _connparams(self, async=False): conninfo = self.connection.client connparams = {'host': conninfo.hostname or '127.0.0.1', 'port': conninfo.port or DEFAULT_PORT, @@ -814,19 +821,22 @@ class Channel(virtual.Channel): redis.Connection ) - class Connection(connection_cls): - def disconnect(self): - channel._on_connection_disconnect(self) - super(Connection, self).disconnect() - connparams['connection_class'] = Connection + if async: + class Connection(connection_cls): + def disconnect(self): + super(Connection, self).disconnect() + channel._on_connection_disconnect(self) + connparams['connection_class'] = Connection return connparams - def _create_client(self): + def _create_client(self, async=False): + if async: + return self.Client(connection_pool=self.async_pool) return self.Client(connection_pool=self.pool) - def _get_pool(self): - params = self._connparams() + def _get_pool(self, async=False): + params = self._connparams(async=async) self.keyprefix_fanout = self.keyprefix_fanout.format(db=params['db']) return redis.ConnectionPool(**params) @@ -867,15 +877,20 @@ class Channel(virtual.Channel): self._pool = self._get_pool() return self._pool + @property + def async_pool(self): + if self._async_pool is None: + self._async_pool = self._get_pool(async=True) + @cached_property def client(self): """Client used to publish messages, BRPOP etc.""" - return self._create_client() + return self._create_client(async=True) @cached_property def subclient(self): """Pub/Sub connection used to consume fanout queues.""" - client = self._create_client() + client = self._create_client(async=True) pubsub = client.pubsub() pool = pubsub.connection_pool pubsub.connection = pool.get_connection('pubsub', pubsub.shard_hint) |