summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-05-18 15:38:37 +0100
committerAsk Solem <ask@celeryproject.org>2012-05-18 15:38:37 +0100
commitca1f5558a8a20c15bb5fc335a84e306c4c41ca57 (patch)
tree429d4f089845b1e01552f952670d9d3b964ca703
parent07a1fef5a1275ce9d703cfe9e0c30c2ae4fbd40b (diff)
downloadkombu-ca1f5558a8a20c15bb5fc335a84e306c4c41ca57.tar.gz
ensure_connection now supports a callback that is called for every second
-rw-r--r--kombu/connection.py11
-rw-r--r--kombu/utils/__init__.py10
2 files changed, 16 insertions, 5 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index aa681c7c..24c6f756 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -209,7 +209,8 @@ class BrokerConnection(object):
close = release
def ensure_connection(self, errback=None, max_retries=None,
- interval_start=2, interval_step=2, interval_max=30):
+ interval_start=2, interval_step=2, interval_max=30,
+ callback=None):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
@@ -227,11 +228,15 @@ class BrokerConnection(object):
for each retry.
:keyword interval_max: Maximum number of seconds to sleep between
each retry.
+ :keyword callback: Optional callback that is called for every
+ internal iteration (1 s)
+ :keyword callback: Optional callback that is called for every
+ internal iteration (1 s).
"""
retry_over_time(self.connect, self.connection_errors, (), {},
errback, max_retries,
- interval_start, interval_step, interval_max)
+ interval_start, interval_step, interval_max, callback)
return self
def revive(self, new_channel):
@@ -628,7 +633,7 @@ class BrokerConnection(object):
@property
def is_evented(self):
- return self.eventmap or getattr(self.transport, "on_poll_start", None)
+ return getattr(self.transport, "on_poll_start", None)
Connection = BrokerConnection
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 31f07ad6..3f5d0701 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -124,7 +124,8 @@ def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):
def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
- max_retries=None, interval_start=2, interval_step=2, interval_max=30):
+ max_retries=None, interval_start=2, interval_step=2, interval_max=30,
+ callback=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
@@ -159,9 +160,14 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
except catch, exc:
if max_retries is not None and retries > max_retries:
raise
+ if callback:
+ callback()
if errback:
errback(exc, interval)
- sleep(interval)
+ for i in fxrange(stop=interval):
+ if i and callback:
+ callback()
+ sleep(1.0)
def emergency_dump_state(state, open_file=open, dump=None):