summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAshish Bansal (mrphantom) <bansal.ashish096@gmail.com>2019-12-02 03:42:13 +0530
committerAsif Saif Uddin <auvipy@gmail.com>2019-12-02 04:12:13 +0600
commitccc9e01f32eb874c69c2e07fbd48c1e07e3df358 (patch)
tree3c6fef50f124ef4c43debe8a0d18eaba623c0ff0
parentde8d8cf8c1f6cd6da8b5d0b4cd66130b1f129e21 (diff)
downloadkombu-ccc9e01f32eb874c69c2e07fbd48c1e07e3df358.tar.gz
Fix redis health checks (#1122)
* Fix redis transport health checks functionality * Add tests for accepts_argument util function * Reduce default health check interval to 25s
-rw-r--r--kombu/transport/redis.py16
-rw-r--r--kombu/utils/functional.py16
-rw-r--r--t/unit/transport/test_redis.py2
-rw-r--r--t/unit/utils/test_functional.py28
4 files changed, 58 insertions, 4 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 9348a1ba..95578509 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -23,6 +23,7 @@ from kombu.utils.scheduling import cycle_by_name
from kombu.utils.url import _parse_url
from kombu.utils.uuid import uuid
from kombu.utils.compat import _detect_environment
+from kombu.utils.functional import accepts_argument
from . import virtual
@@ -43,6 +44,8 @@ crit, warn = logger.critical, logger.warn
DEFAULT_PORT = 6379
DEFAULT_DB = 0
+DEFAULT_HEALTH_CHECK_INTERVAL = 25
+
PRIORITY_STEPS = [0, 3, 6, 9]
error_classes_t = namedtuple('error_classes_t', (
@@ -903,6 +906,14 @@ class Channel(virtual.Channel):
'socket_keepalive': self.socket_keepalive,
'socket_keepalive_options': self.socket_keepalive_options,
}
+
+ conn_class = self.connection_class
+ if (
+ hasattr(conn_class, '__init__') and
+ accepts_argument(conn_class.__init__, 'health_check_interval')
+ ):
+ connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL
+
if conninfo.ssl:
# Connection(ssl={}) must be a dict containing the keys:
# 'ssl_cert_reqs', 'ssl_ca_certs', 'ssl_certfile', 'ssl_keyfile'
@@ -1053,7 +1064,10 @@ class Transport(virtual.Transport):
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
- loop.call_repeatedly(30, cycle.maybe_check_subclient_health)
+ loop.call_repeatedly(
+ DEFAULT_HEALTH_CHECK_INTERVAL,
+ cycle.maybe_check_subclient_health
+ )
def on_readable(self, fileno):
"""Handle AIO event for one of our file descriptors."""
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 30c3ae8c..a793150e 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals
import random
import sys
import threading
+import inspect
from collections import OrderedDict
@@ -18,7 +19,7 @@ from time import sleep, time
from vine.utils import wraps
from kombu.five import (
- UserDict, items, keys, python_2_unicode_compatible, string_t,
+ UserDict, items, keys, python_2_unicode_compatible, string_t, PY3,
)
from .encoding import safe_repr as _safe_repr
@@ -372,6 +373,19 @@ def reprcall(name, args=(), kwargs=None, sep=', '):
)
+def accepts_argument(func, argument_name):
+ if PY3:
+ argument_spec = inspect.getfullargspec(func)
+ return (
+ argument_name in argument_spec.args or
+ argument_name in argument_spec.kwonlyargs
+ )
+
+ argument_spec = inspect.getargspec(func)
+ argument_names = argument_spec.args
+ return argument_name in argument_names
+
+
# Compat names (before kombu 3.0)
promise = lazy
maybe_promise = maybe_evaluate
diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py
index 58e22413..6c0a5229 100644
--- a/t/unit/transport/test_redis.py
+++ b/t/unit/transport/test_redis.py
@@ -738,7 +738,7 @@ class test_Channel:
transport.cycle.on_poll_init.assert_called_with(loop.poller)
loop.call_repeatedly.assert_has_calls([
call(10, transport.cycle.maybe_restore_messages),
- call(30, transport.cycle.maybe_check_subclient_health),
+ call(25, transport.cycle.maybe_check_subclient_health),
])
loop.on_tick.add.assert_called()
on_poll_start = loop.on_tick.add.call_args[0][0]
diff --git a/t/unit/utils/test_functional.py b/t/unit/utils/test_functional.py
index 2ed64761..b5ec03af 100644
--- a/t/unit/utils/test_functional.py
+++ b/t/unit/utils/test_functional.py
@@ -8,11 +8,14 @@ from itertools import count
from case import Mock, mock, skip
-from kombu.five import items
+from kombu.five import (
+ items, PY3,
+)
from kombu.utils import functional as utils
from kombu.utils.functional import (
ChannelPromise, LRUCache, fxrange, fxrangemax, memoize, lazy,
maybe_evaluate, maybe_list, reprcall, reprkwargs, retry_over_time,
+ accepts_argument,
)
@@ -310,3 +313,26 @@ def test_reprkwargs():
def test_reprcall():
assert reprcall('add', (2, 2), {'copy': True})
+
+
+class test_accepts_arg:
+ def function(self, foo, bar, baz="baz"):
+ pass
+
+ def test_valid_argument(self):
+ assert accepts_argument(self.function, 'self')
+ assert accepts_argument(self.function, 'foo')
+ assert accepts_argument(self.function, 'baz')
+
+ def test_invalid_argument(self):
+ assert not accepts_argument(self.function, 'random_argument')
+ if PY3:
+ assert not accepts_argument(test_accepts_arg, 'foo')
+
+ def test_raise_exception(self):
+ with pytest.raises(Exception):
+ accepts_argument(None, 'foo')
+
+ if not PY3:
+ with pytest.raises(Exception):
+ accepts_argument(test_accepts_arg, 'foo')