diff options
author | Kevin Fox <kevin_fox@me.com> | 2020-01-10 23:20:31 -0600 |
---|---|---|
committer | Asif Saif Uddin <auvipy@gmail.com> | 2020-01-11 11:20:31 +0600 |
commit | cbd327dc3bdd124c1bc4fc3d8f2b451fd10e2da6 (patch) | |
tree | d0606f4f2b6e733c5acd86c758a939822349f8b3 | |
parent | fe5adb539ce9b351ffe3df0719ca281a736b0014 (diff) | |
download | kombu-cbd327dc3bdd124c1bc4fc3d8f2b451fd10e2da6.tar.gz |
Add support for health_check_interval option in broker_transport_options. (#1145)
* Add support for health_check_interval option in broker_transport_options. #1114
* Added Keivn Fox to authors.
* Do not use default arg for pop() method.
* Add dedicated test case for configurable health_check
* Add test case for when health_check_interval is not supported by connection_class in redis.
* Fix Flake8
* Added more test cases for redis.
* Updated tests.
-rw-r--r-- | AUTHORS | 1 | ||||
-rw-r--r-- | kombu/transport/redis.py | 25 | ||||
-rw-r--r-- | t/unit/test_connection.py | 1 | ||||
-rw-r--r-- | t/unit/transport/test_redis.py | 52 |
4 files changed, 71 insertions, 8 deletions
@@ -78,6 +78,7 @@ Joshua Harlow <harlowja@gmail.com> Juan Carlos Ferrer <juan.carlos@micronixsolutions.com> Kai Groner <kai@gronr.com> Keith Fitzgerald <ghostrocket@me.com> +Kevin Fox <kevin_fox@me.com> Kevin McCarthy <me@kevinmccarthy.org> Kevin McDonald <k3vinmcdonald@gmail.com> Latitia M. Haskins <lhaskins@jetsonsys.com> diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 95578509..6c177a4f 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -349,9 +349,9 @@ class MultiChannelPoller(object): for channel in self._channels: # only if subclient property is cached client = channel.__dict__.get('subclient') - if client is not None: - if callable(getattr(client, "check_health", None)): - client.check_health() + if client is not None \ + and callable(getattr(client, 'check_health', None)): + client.check_health() def on_readable(self, fileno): chan, type = self._fd_to_chan[fileno] @@ -428,6 +428,7 @@ class Channel(virtual.Channel): socket_keepalive = None socket_keepalive_options = None max_connections = 10 + health_check_interval = DEFAULT_HEALTH_CHECK_INTERVAL #: Transport option to disable fanout keyprefix. #: Can also be string, in which case it changes the default #: prefix ('/{db}.') into to something else. The prefix must @@ -491,14 +492,14 @@ class Channel(virtual.Channel): 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', + 'health_check_interval', 'priority_steps') # <-- do not add comma here! ) connection_class = redis.Connection if redis else None def __init__(self, *args, **kwargs): - super_ = super(Channel, self) - super_.__init__(*args, **kwargs) + super(Channel, self).__init__(*args, **kwargs) if not self.ack_emulation: # disable visibility timeout self.QoS = virtual.QoS @@ -905,14 +906,18 @@ class Channel(virtual.Channel): 'socket_connect_timeout': self.socket_connect_timeout, 'socket_keepalive': self.socket_keepalive, 'socket_keepalive_options': self.socket_keepalive_options, + 'health_check_interval': self.health_check_interval, } conn_class = self.connection_class + + # If the connection class does not support the `health_check_interval` + # argument then remove it. if ( hasattr(conn_class, '__init__') and - accepts_argument(conn_class.__init__, 'health_check_interval') + not accepts_argument(conn_class.__init__, 'health_check_interval') ): - connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL + connparams.pop('health_check_interval') if conninfo.ssl: # Connection(ssl={}) must be a dict containing the keys: @@ -1064,8 +1069,12 @@ class Transport(virtual.Transport): [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) + health_check_interval = connection.client.transport_options.get( + 'health_check_interval', + DEFAULT_HEALTH_CHECK_INTERVAL + ) loop.call_repeatedly( - DEFAULT_HEALTH_CHECK_INTERVAL, + health_check_interval, cycle.maybe_check_subclient_health ) diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index 409b2b12..9bec0eb2 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -117,6 +117,7 @@ class test_connection_utils: clone = deepcopy(conn) assert clone.alt == ['amqp://host'] + @skip.unless_module('sqlalchemy') def test_parse_generated_as_uri_pg(self): conn = Connection(self.pg_url) assert conn.as_uri() == self.pg_nopass diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index 6c0a5229..59136a0d 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -311,6 +311,18 @@ class test_Channel: conn.channel() pool.disconnect.assert_not_called() + def test_get_redis_ConnectionError(self): + from redis.exceptions import ConnectionError + from kombu.transport.redis import get_redis_ConnectionError + connection_error = get_redis_ConnectionError() + assert connection_error == ConnectionError + + def test_after_fork_cleanup_channel(self): + from kombu.transport.redis import _after_fork_cleanup_channel + channel = Mock() + _after_fork_cleanup_channel(channel) + channel._after_fork.assert_called_once() + def test_after_fork(self): self.channel._pool = None self.channel._after_fork() @@ -696,6 +708,20 @@ class test_Channel: path = connection_parameters['path'] assert (password, path) == (None, '/var/run/redis.sock') + def test_connparams_health_check_interval_not_supported(self): + with patch('kombu.transport.redis.Channel._create_client'): + with Connection('redis+socket:///tmp/redis.sock') as conn: + conn.default_channel.connection_class = \ + Mock(name='connection_class') + connparams = conn.default_channel._connparams() + assert 'health_check_interval' not in connparams + + def test_connparams_health_check_interval_supported(self): + with patch('kombu.transport.redis.Channel._create_client'): + with Connection('redis+socket:///tmp/redis.sock') as conn: + connparams = conn.default_channel._connparams() + assert connparams['health_check_interval'] == 25 + def test_rotate_cycle_ValueError(self): cycle = self.channel._queue_cycle cycle.update(['kramer', 'jerry']) @@ -733,6 +759,7 @@ class test_Channel: transport.cycle = Mock(name='cycle') transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} conn = Mock(name='conn') + conn.client = Mock(name='client', transport_options={}) loop = Mock(name='loop') redis.Transport.register_with_event_loop(transport, conn, loop) transport.cycle.on_poll_init.assert_called_with(loop.poller) @@ -750,6 +777,31 @@ class test_Channel: call(13, transport.on_readable, 13), ]) + def test_configurable_health_check(self): + transport = self.connection.transport + transport.cycle = Mock(name='cycle') + transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} + conn = Mock(name='conn') + conn.client = Mock(name='client', transport_options={ + 'health_check_interval': 15, + }) + loop = Mock(name='loop') + redis.Transport.register_with_event_loop(transport, conn, loop) + transport.cycle.on_poll_init.assert_called_with(loop.poller) + loop.call_repeatedly.assert_has_calls([ + call(10, transport.cycle.maybe_restore_messages), + call(15, transport.cycle.maybe_check_subclient_health), + ]) + loop.on_tick.add.assert_called() + on_poll_start = loop.on_tick.add.call_args[0][0] + + on_poll_start() + transport.cycle.on_poll_start.assert_called_with() + loop.add_reader.assert_has_calls([ + call(12, transport.on_readable, 12), + call(13, transport.on_readable, 13), + ]) + def test_transport_on_readable(self): transport = self.connection.transport cycle = transport.cycle = Mock(name='cyle') |