summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Runge <mrunge@redhat.com>2021-03-05 11:24:25 +0100
committerMatthias Runge <mrunge@redhat.com>2021-03-31 10:49:35 +0200
commita025b4d4ee9fe1bd722960ea03445034580bec6f (patch)
tree288419ccc6254d3ec5a6697bbdac914d293cc3dc
parentf2005df3d6565e050e4ca710df4eda518353b9c2 (diff)
downloadtooz-a025b4d4ee9fe1bd722960ea03445034580bec6f.tar.gz
Retry on redis connection errors
Sometimes, connections get closed by the server. This change adds retrying to the code. Change-Id: Iaab5ce609c0dcf7085f5dd43efbd37eb4b88f17b
-rw-r--r--requirements.txt2
-rw-r--r--tooz/drivers/redis.py132
2 files changed, 80 insertions, 54 deletions
diff --git a/requirements.txt b/requirements.txt
index 5796358..99a915a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -6,7 +6,7 @@ stevedore>=1.16.0 # Apache-2.0
voluptuous>=0.8.9 # BSD License
msgpack>=0.4.0 # Apache-2.0
fasteners>=0.7 # Apache-2.0
-tenacity>=3.2.1, <7.0.0 # Apache-2.0
+tenacity>=3.2.1,<7.0.0 # Apache-2.0
futurist>=1.2.0 # Apache-2.0
oslo.utils>=4.7.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py
index 0c01a1b..5e75350 100644
--- a/tooz/drivers/redis.py
+++ b/tooz/drivers/redis.py
@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import contextlib
from distutils import version
import functools
import logging
@@ -35,19 +34,46 @@ from tooz import utils
LOG = logging.getLogger(__name__)
-@contextlib.contextmanager
-def _translate_failures():
- """Translates common redis exceptions into tooz exceptions."""
- try:
- yield
- except (exceptions.ConnectionError, exceptions.TimeoutError) as e:
- utils.raise_with_cause(coordination.ToozConnectionError,
- encodeutils.exception_to_unicode(e),
- cause=e)
- except exceptions.RedisError as e:
- utils.raise_with_cause(tooz.ToozError,
- encodeutils.exception_to_unicode(e),
- cause=e)
+def _handle_failures(func=None, n_tries=15):
+
+ """Translates common redis exceptions into tooz exceptions.
+
+ This also enables retrying on certain exceptions.
+
+ :param func: the function to act on
+ :param n_tries: the number of retries
+ """
+
+ if func is None:
+ return functools.partial(
+ _handle_failures,
+ n_tries=n_tries
+ )
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ ntries = n_tries
+ while ntries > 1:
+ try:
+ return func(*args, **kwargs)
+ except exceptions.ConnectionError as e:
+ # retry ntries times and then raise a connection error
+ ntries -= 1
+ if ntries >= 1:
+ utils.raise_with_cause(coordination.ToozConnectionError,
+ encodeutils.exception_to_unicode(e),
+ cause=e)
+
+ except (exceptions.TimeoutError) as e:
+ utils.raise_with_cause(coordination.ToozConnectionError,
+ encodeutils.exception_to_unicode(e),
+ cause=e)
+ except exceptions.RedisError as e:
+ utils.raise_with_cause(tooz.ToozError,
+ encodeutils.exception_to_unicode(e),
+ cause=e)
+ return func(*args, **kwargs)
+ return wrapper
class RedisLock(locking.Lock):
@@ -63,48 +89,48 @@ class RedisLock(locking.Lock):
self._coord = coord
self._client = client
+ @_handle_failures
def is_still_owner(self):
- with _translate_failures():
- lock_tok = self._lock.local.token
- if not lock_tok:
- return False
- owner_tok = self._client.get(self.name)
- return owner_tok == lock_tok
+ lock_tok = self._lock.local.token
+ if not lock_tok:
+ return False
+ owner_tok = self._client.get(self.name)
+ return owner_tok == lock_tok
+ @_handle_failures
def break_(self):
- with _translate_failures():
- return bool(self._client.delete(self.name))
+ return bool(self._client.delete(self.name))
+ @_handle_failures
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking)
- with _translate_failures():
- acquired = self._lock.acquire(
- blocking=blocking, blocking_timeout=timeout)
- if acquired:
- with self._exclusive_access:
- self._coord._acquired_locks.add(self)
- return acquired
-
+ acquired = self._lock.acquire(
+ blocking=blocking, blocking_timeout=timeout)
+ if acquired:
+ with self._exclusive_access:
+ self._coord._acquired_locks.add(self)
+ return acquired
+
+ @_handle_failures
def release(self):
with self._exclusive_access:
- with _translate_failures():
- try:
- self._lock.release()
- except exceptions.LockError as e:
- LOG.error("Unable to release lock '%r': %s", self, e)
- return False
- finally:
- self._coord._acquired_locks.discard(self)
- return True
+ try:
+ self._lock.release()
+ except exceptions.LockError as e:
+ LOG.error("Unable to release lock '%r': %s", self, e)
+ return False
+ finally:
+ self._coord._acquired_locks.discard(self)
+ return True
+ @_handle_failures
def heartbeat(self):
with self._exclusive_access:
if self.acquired:
- with _translate_failures():
- self._lock.reacquire()
- return True
+ self._lock.reacquire()
+ return True
return False
@property
@@ -433,6 +459,7 @@ return 1
return master_client
return redis.StrictRedis(**kwargs)
+ @_handle_failures
def _start(self):
super(RedisDriver, self)._start()
try:
@@ -446,8 +473,7 @@ return 1
# Ensure that the server is alive and not dead, this does not
# ensure the server will always be alive, but does insure that it
# at least is alive once...
- with _translate_failures():
- self._server_info = self._client.info()
+ self._server_info = self._client.info()
# Validate we have a good enough redis version we are connected
# to so that the basic set of features we support will actually
# work (instead of blowing up).
@@ -506,12 +532,13 @@ return 1
def _decode_group_id(self, group_id):
return utils.to_binary(group_id, encoding=self._encoding)
+ @_handle_failures
def heartbeat(self):
- with _translate_failures():
- beat_id = self._encode_beat_id(self._member_id)
- expiry_ms = max(0, int(self.membership_timeout * 1000.0))
- self._client.psetex(beat_id, time_ms=expiry_ms,
- value=self.STILL_ALIVE)
+ beat_id = self._encode_beat_id(self._member_id)
+ expiry_ms = max(0, int(self.membership_timeout * 1000.0))
+ self._client.psetex(beat_id, time_ms=expiry_ms,
+ value=self.STILL_ALIVE)
+
for lock in self._acquired_locks.copy():
try:
lock.heartbeat()
@@ -520,6 +547,7 @@ return 1
exc_info=True)
return min(self.lock_timeout, self.membership_timeout)
+ @_handle_failures
def _stop(self):
while self._acquired_locks:
lock = self._acquired_locks.pop()
@@ -534,8 +562,7 @@ return 1
try:
# NOTE(harlowja): this will delete nothing if the key doesn't
# exist in the first place, which is fine/expected/desired...
- with _translate_failures():
- self._client.delete(beat_id)
+ self._client.delete(beat_id)
except tooz.ToozError:
LOG.warning("Unable to delete heartbeat key '%s'", beat_id,
exc_info=True)
@@ -752,5 +779,4 @@ return 1
return result
-RedisFutureResult = functools.partial(coordination.CoordinatorResult,
- failure_translator=_translate_failures)
+RedisFutureResult = functools.partial(coordination.CoordinatorResult)