From ca1f5558a8a20c15bb5fc335a84e306c4c41ca57 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Fri, 18 May 2012 15:38:37 +0100 Subject: ensure_connection now supports a callback that is called for every second --- kombu/connection.py | 11 ++++++++--- kombu/utils/__init__.py | 10 ++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/kombu/connection.py b/kombu/connection.py index aa681c7c..24c6f756 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -209,7 +209,8 @@ class BrokerConnection(object): close = release def ensure_connection(self, errback=None, max_retries=None, - interval_start=2, interval_step=2, interval_max=30): + interval_start=2, interval_step=2, interval_max=30, + callback=None): """Ensure we have a connection to the server. If not retry establishing the connection with the settings @@ -227,11 +228,15 @@ class BrokerConnection(object): for each retry. :keyword interval_max: Maximum number of seconds to sleep between each retry. + :keyword callback: Optional callback that is called for every + internal iteration (1 s) + :keyword callback: Optional callback that is called for every + internal iteration (1 s). """ retry_over_time(self.connect, self.connection_errors, (), {}, errback, max_retries, - interval_start, interval_step, interval_max) + interval_start, interval_step, interval_max, callback) return self def revive(self, new_channel): @@ -628,7 +633,7 @@ class BrokerConnection(object): @property def is_evented(self): - return self.eventmap or getattr(self.transport, "on_poll_start", None) + return getattr(self.transport, "on_poll_start", None) Connection = BrokerConnection diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 31f07ad6..3f5d0701 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -124,7 +124,8 @@ def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0): def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, - max_retries=None, interval_start=2, interval_step=2, interval_max=30): + max_retries=None, interval_start=2, interval_step=2, interval_max=30, + callback=None): """Retry the function over and over until max retries is exceeded. For each retry we sleep a for a while before we try again, this interval @@ -159,9 +160,14 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, except catch, exc: if max_retries is not None and retries > max_retries: raise + if callback: + callback() if errback: errback(exc, interval) - sleep(interval) + for i in fxrange(stop=interval): + if i and callback: + callback() + sleep(1.0) def emergency_dump_state(state, open_file=open, dump=None): -- cgit v1.2.1