summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Fox <kevin_fox@me.com>2020-01-10 23:20:31 -0600
committerAsif Saif Uddin <auvipy@gmail.com>2020-01-11 11:20:31 +0600
commitcbd327dc3bdd124c1bc4fc3d8f2b451fd10e2da6 (patch)
treed0606f4f2b6e733c5acd86c758a939822349f8b3
parentfe5adb539ce9b351ffe3df0719ca281a736b0014 (diff)
downloadkombu-cbd327dc3bdd124c1bc4fc3d8f2b451fd10e2da6.tar.gz
Add support for health_check_interval option in broker_transport_options. (#1145)
* Add support for health_check_interval option in broker_transport_options. #1114 * Added Keivn Fox to authors. * Do not use default arg for pop() method. * Add dedicated test case for configurable health_check * Add test case for when health_check_interval is not supported by connection_class in redis. * Fix Flake8 * Added more test cases for redis. * Updated tests.
-rw-r--r--AUTHORS1
-rw-r--r--kombu/transport/redis.py25
-rw-r--r--t/unit/test_connection.py1
-rw-r--r--t/unit/transport/test_redis.py52
4 files changed, 71 insertions, 8 deletions
diff --git a/AUTHORS b/AUTHORS
index af22a804..e0b25360 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -78,6 +78,7 @@ Joshua Harlow <harlowja@gmail.com>
Juan Carlos Ferrer <juan.carlos@micronixsolutions.com>
Kai Groner <kai@gronr.com>
Keith Fitzgerald <ghostrocket@me.com>
+Kevin Fox <kevin_fox@me.com>
Kevin McCarthy <me@kevinmccarthy.org>
Kevin McDonald <k3vinmcdonald@gmail.com>
Latitia M. Haskins <lhaskins@jetsonsys.com>
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 95578509..6c177a4f 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -349,9 +349,9 @@ class MultiChannelPoller(object):
for channel in self._channels:
# only if subclient property is cached
client = channel.__dict__.get('subclient')
- if client is not None:
- if callable(getattr(client, "check_health", None)):
- client.check_health()
+ if client is not None \
+ and callable(getattr(client, 'check_health', None)):
+ client.check_health()
def on_readable(self, fileno):
chan, type = self._fd_to_chan[fileno]
@@ -428,6 +428,7 @@ class Channel(virtual.Channel):
socket_keepalive = None
socket_keepalive_options = None
max_connections = 10
+ health_check_interval = DEFAULT_HEALTH_CHECK_INTERVAL
#: Transport option to disable fanout keyprefix.
#: Can also be string, in which case it changes the default
#: prefix ('/{db}.') into to something else. The prefix must
@@ -491,14 +492,14 @@ class Channel(virtual.Channel):
'socket_keepalive_options',
'queue_order_strategy',
'max_connections',
+ 'health_check_interval',
'priority_steps') # <-- do not add comma here!
)
connection_class = redis.Connection if redis else None
def __init__(self, *args, **kwargs):
- super_ = super(Channel, self)
- super_.__init__(*args, **kwargs)
+ super(Channel, self).__init__(*args, **kwargs)
if not self.ack_emulation: # disable visibility timeout
self.QoS = virtual.QoS
@@ -905,14 +906,18 @@ class Channel(virtual.Channel):
'socket_connect_timeout': self.socket_connect_timeout,
'socket_keepalive': self.socket_keepalive,
'socket_keepalive_options': self.socket_keepalive_options,
+ 'health_check_interval': self.health_check_interval,
}
conn_class = self.connection_class
+
+ # If the connection class does not support the `health_check_interval`
+ # argument then remove it.
if (
hasattr(conn_class, '__init__') and
- accepts_argument(conn_class.__init__, 'health_check_interval')
+ not accepts_argument(conn_class.__init__, 'health_check_interval')
):
- connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL
+ connparams.pop('health_check_interval')
if conninfo.ssl:
# Connection(ssl={}) must be a dict containing the keys:
@@ -1064,8 +1069,12 @@ class Transport(virtual.Transport):
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
+ health_check_interval = connection.client.transport_options.get(
+ 'health_check_interval',
+ DEFAULT_HEALTH_CHECK_INTERVAL
+ )
loop.call_repeatedly(
- DEFAULT_HEALTH_CHECK_INTERVAL,
+ health_check_interval,
cycle.maybe_check_subclient_health
)
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
index 409b2b12..9bec0eb2 100644
--- a/t/unit/test_connection.py
+++ b/t/unit/test_connection.py
@@ -117,6 +117,7 @@ class test_connection_utils:
clone = deepcopy(conn)
assert clone.alt == ['amqp://host']
+ @skip.unless_module('sqlalchemy')
def test_parse_generated_as_uri_pg(self):
conn = Connection(self.pg_url)
assert conn.as_uri() == self.pg_nopass
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 6c0a5229..59136a0d 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -311,6 +311,18 @@ class test_Channel:
conn.channel()
pool.disconnect.assert_not_called()
+ def test_get_redis_ConnectionError(self):
+ from redis.exceptions import ConnectionError
+ from kombu.transport.redis import get_redis_ConnectionError
+ connection_error = get_redis_ConnectionError()
+ assert connection_error == ConnectionError
+
+ def test_after_fork_cleanup_channel(self):
+ from kombu.transport.redis import _after_fork_cleanup_channel
+ channel = Mock()
+ _after_fork_cleanup_channel(channel)
+ channel._after_fork.assert_called_once()
+
def test_after_fork(self):
self.channel._pool = None
self.channel._after_fork()
@@ -696,6 +708,20 @@ class test_Channel:
path = connection_parameters['path']
assert (password, path) == (None, '/var/run/redis.sock')
+ def test_connparams_health_check_interval_not_supported(self):
+ with patch('kombu.transport.redis.Channel._create_client'):
+ with Connection('redis+socket:///tmp/redis.sock') as conn:
+ conn.default_channel.connection_class = \
+ Mock(name='connection_class')
+ connparams = conn.default_channel._connparams()
+ assert 'health_check_interval' not in connparams
+
+ def test_connparams_health_check_interval_supported(self):
+ with patch('kombu.transport.redis.Channel._create_client'):
+ with Connection('redis+socket:///tmp/redis.sock') as conn:
+ connparams = conn.default_channel._connparams()
+ assert connparams['health_check_interval'] == 25
+
def test_rotate_cycle_ValueError(self):
cycle = self.channel._queue_cycle
cycle.update(['kramer', 'jerry'])
@@ -733,6 +759,7 @@ class test_Channel:
transport.cycle = Mock(name='cycle')
transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'}
conn = Mock(name='conn')
+ conn.client = Mock(name='client', transport_options={})
loop = Mock(name='loop')
redis.Transport.register_with_event_loop(transport, conn, loop)
transport.cycle.on_poll_init.assert_called_with(loop.poller)
@@ -750,6 +777,31 @@ class test_Channel:
call(13, transport.on_readable, 13),
])
+ def test_configurable_health_check(self):
+ transport = self.connection.transport
+ transport.cycle = Mock(name='cycle')
+ transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'}
+ conn = Mock(name='conn')
+ conn.client = Mock(name='client', transport_options={
+ 'health_check_interval': 15,
+ })
+ loop = Mock(name='loop')
+ redis.Transport.register_with_event_loop(transport, conn, loop)
+ transport.cycle.on_poll_init.assert_called_with(loop.poller)
+ loop.call_repeatedly.assert_has_calls([
+ call(10, transport.cycle.maybe_restore_messages),
+ call(15, transport.cycle.maybe_check_subclient_health),
+ ])
+ loop.on_tick.add.assert_called()
+ on_poll_start = loop.on_tick.add.call_args[0][0]
+
+ on_poll_start()
+ transport.cycle.on_poll_start.assert_called_with()
+ loop.add_reader.assert_has_calls([
+ call(12, transport.on_readable, 12),
+ call(13, transport.on_readable, 13),
+ ])
+
def test_transport_on_readable(self):
transport = self.connection.transport
cycle = transport.cycle = Mock(name='cyle')