diff options
author | tothegump <tothegump@gmail.com> | 2018-08-09 21:55:30 +0800 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2018-08-09 16:55:30 +0300 |
commit | 152923594fcc075016b12531f34782b056c9b5ba (patch) | |
tree | 2c6f3d20279f84123476ddb2b25be792b5c4ea35 | |
parent | 0c740a7b58fe8c4d8b5ac503f5ed4551c19b51fc (diff) | |
download | kombu-152923594fcc075016b12531f34782b056c9b5ba.tar.gz |
Add timeout to retry_over_time (#880)
add ut
-rw-r--r-- | kombu/connection.py | 7 | ||||
-rw-r--r-- | kombu/utils/functional.py | 11 | ||||
-rw-r--r-- | t/unit/utils/test_functional.py | 17 |
3 files changed, 30 insertions, 5 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index e3af718d..ee2eb2dd 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -360,7 +360,8 @@ class Connection(object): def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30, - callback=None, reraise_as_library_errors=True): + callback=None, reraise_as_library_errors=True, + timeout=None): """Ensure we have a connection to the server. If not retry establishing the connection with the settings @@ -384,6 +385,8 @@ class Connection(object): each retry. callback (Callable): Optional callback that is called for every internal iteration (1 s). + timeout (int): Maximum amount of time in seconds to spend + waiting for connection """ def on_error(exc, intervals, retries, interval=0): round = self.completes_cycle(retries) @@ -402,7 +405,7 @@ class Connection(object): retry_over_time(self.connect, self.recoverable_connection_errors, (), {}, on_error, max_retries, interval_start, interval_step, interval_max, - callback) + callback, timeout=timeout) return self @contextmanager diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py index 750d5dba..12123b0b 100644 --- a/kombu/utils/functional.py +++ b/kombu/utils/functional.py @@ -7,7 +7,7 @@ import threading from collections import Iterable, Mapping, OrderedDict from itertools import count, repeat -from time import sleep +from time import sleep, time from vine.utils import wraps @@ -295,7 +295,7 @@ def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, max_retries=None, interval_start=2, interval_step=2, - interval_max=30, callback=None): + interval_max=30, callback=None, timeout=None): """Retry the function over and over until max retries is exceeded. For each retry we sleep a for a while before we try again, this interval @@ -316,23 +316,28 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, which return the time in seconds to sleep next, and ``retries`` is the number of previous retries. max_retries (int): Maximum number of retries before we give up. - If this is not set, we will retry forever. + If neither of this and timeout is set, we will retry forever. + If one of this and timeout is reached, stop. interval_start (float): How long (in seconds) we start sleeping between retries. interval_step (float): By how much the interval is increased for each retry. interval_max (float): Maximum number of seconds to sleep between retries. + timeout (int): Maximum seconds waiting before we give up. """ interval_range = fxrange(interval_start, interval_max + interval_start, interval_step, repeatlast=True) + end = time() + timeout if timeout else None for retries in count(): try: return fun(*args, **kwargs) except catch as exc: if max_retries and retries >= max_retries: raise + if end and time() > end: + raise if callback: callback() tts = float(errback(exc, interval_range, retries) if errback diff --git a/t/unit/utils/test_functional.py b/t/unit/utils/test_functional.py index 7a97b5b3..c8b4485e 100644 --- a/t/unit/utils/test_functional.py +++ b/t/unit/utils/test_functional.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, unicode_literals import pickle + import pytest from itertools import count @@ -209,6 +210,22 @@ class test_retry_over_time: finally: utils.count = prev_count + def test_retry_timeout(self): + + with pytest.raises(self.Predicate): + retry_over_time( + self.myfun, self.Predicate, + errback=self.errback, interval_max=14, timeout=1 + ) + assert self.index == 1 + + # no errback + with pytest.raises(self.Predicate): + retry_over_time( + self.myfun, self.Predicate, + errback=None, timeout=1, + ) + @mock.sleepdeprived(module=utils) def test_retry_once(self): with pytest.raises(self.Predicate): |