summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortothegump <tothegump@gmail.com>2018-08-09 21:55:30 +0800
committerOmer Katz <omer.drow@gmail.com>2018-08-09 16:55:30 +0300
commit152923594fcc075016b12531f34782b056c9b5ba (patch)
tree2c6f3d20279f84123476ddb2b25be792b5c4ea35
parent0c740a7b58fe8c4d8b5ac503f5ed4551c19b51fc (diff)
downloadkombu-152923594fcc075016b12531f34782b056c9b5ba.tar.gz
Add timeout to retry_over_time (#880)
add ut
-rw-r--r--kombu/connection.py7
-rw-r--r--kombu/utils/functional.py11
-rw-r--r--t/unit/utils/test_functional.py17
3 files changed, 30 insertions, 5 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index e3af718d..ee2eb2dd 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -360,7 +360,8 @@ class Connection(object):
def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30,
- callback=None, reraise_as_library_errors=True):
+ callback=None, reraise_as_library_errors=True,
+ timeout=None):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
@@ -384,6 +385,8 @@ class Connection(object):
each retry.
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
+ timeout (int): Maximum amount of time in seconds to spend
+ waiting for connection
"""
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
@@ -402,7 +405,7 @@ class Connection(object):
retry_over_time(self.connect, self.recoverable_connection_errors,
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
- callback)
+ callback, timeout=timeout)
return self
@contextmanager
diff --git a/kombu/utils/functional.py b/kombu/utils/functional.py
index 750d5dba..12123b0b 100644
--- a/kombu/utils/functional.py
+++ b/kombu/utils/functional.py
@@ -7,7 +7,7 @@ import threading
from collections import Iterable, Mapping, OrderedDict
from itertools import count, repeat
-from time import sleep
+from time import sleep, time
from vine.utils import wraps
@@ -295,7 +295,7 @@ def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
max_retries=None, interval_start=2, interval_step=2,
- interval_max=30, callback=None):
+ interval_max=30, callback=None, timeout=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
@@ -316,23 +316,28 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
- If this is not set, we will retry forever.
+ If neither of this and timeout is set, we will retry forever.
+ If one of this and timeout is reached, stop.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
+ timeout (int): Maximum seconds waiting before we give up.
"""
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
+ end = time() + timeout if timeout else None
for retries in count():
try:
return fun(*args, **kwargs)
except catch as exc:
if max_retries and retries >= max_retries:
raise
+ if end and time() > end:
+ raise
if callback:
callback()
tts = float(errback(exc, interval_range, retries) if errback
diff --git a/t/unit/utils/test_functional.py b/t/unit/utils/test_functional.py
index 7a97b5b3..c8b4485e 100644
--- a/t/unit/utils/test_functional.py
+++ b/t/unit/utils/test_functional.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import, unicode_literals
import pickle
+
import pytest
from itertools import count
@@ -209,6 +210,22 @@ class test_retry_over_time:
finally:
utils.count = prev_count
+ def test_retry_timeout(self):
+
+ with pytest.raises(self.Predicate):
+ retry_over_time(
+ self.myfun, self.Predicate,
+ errback=self.errback, interval_max=14, timeout=1
+ )
+ assert self.index == 1
+
+ # no errback
+ with pytest.raises(self.Predicate):
+ retry_over_time(
+ self.myfun, self.Predicate,
+ errback=None, timeout=1,
+ )
+
@mock.sleepdeprived(module=utils)
def test_retry_once(self):
with pytest.raises(self.Predicate):