summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-11-10 13:16:49 +0200
committerGitHub <noreply@github.com>2022-11-10 13:16:49 +0200
commit67214cc3eaa7890c87e45550b8320779f954094b (patch)
tree3bca8b8913224255bdf72de79265ca0441cecb1c
parentbb06ccd52924800ac501d17c8a42038c8e5c5770 (diff)
downloadredis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
-rw-r--r--CHANGES2
-rw-r--r--redis/__init__.py2
-rw-r--r--redis/asyncio/__init__.py2
-rw-r--r--redis/asyncio/client.py7
-rw-r--r--redis/asyncio/cluster.py121
-rw-r--r--redis/asyncio/connection.py8
-rw-r--r--redis/backoff.py17
-rwxr-xr-xredis/client.py8
-rw-r--r--redis/cluster.py187
-rwxr-xr-xredis/connection.py9
-rw-r--r--tests/test_asyncio/test_cluster.py78
-rw-r--r--tests/test_asyncio/test_retry.py22
-rw-r--r--tests/test_cluster.py100
-rw-r--r--tests/test_retry.py16
14 files changed, 413 insertions, 166 deletions
diff --git a/CHANGES b/CHANGES
index 4945f61..883c548 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
* The `deprecated` library is no longer a dependency
+ * Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
+ * Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297)
* Added CredentialsProvider class to support password rotation
* Enable Lock for asyncio cluster mode
diff --git a/redis/__init__.py b/redis/__init__.py
index 5201fe2..6503ac3 100644
--- a/redis/__init__.py
+++ b/redis/__init__.py
@@ -1,5 +1,6 @@
import sys
+from redis.backoff import default_backoff
from redis.client import Redis, StrictRedis
from redis.cluster import RedisCluster
from redis.connection import (
@@ -66,6 +67,7 @@ __all__ = [
"CredentialProvider",
"DataError",
"from_url",
+ "default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py
index 598791a..bf90dde 100644
--- a/redis/asyncio/__init__.py
+++ b/redis/asyncio/__init__.py
@@ -15,6 +15,7 @@ from redis.asyncio.sentinel import (
SentinelManagedSSLConnection,
)
from redis.asyncio.utils import from_url
+from redis.backoff import default_backoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
@@ -43,6 +44,7 @@ __all__ = [
"ConnectionPool",
"DataError",
"from_url",
+ "default_backoff",
"InvalidResponse",
"PubSubError",
"ReadOnlyError",
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index c085571..e0ed85e 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -276,6 +276,13 @@ class Redis(
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs
+ def get_retry(self) -> Optional["Retry"]:
+ return self.get_connection_kwargs().get("retry")
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.get_connection_kwargs().update({"retry": retry})
+ self.connection_pool.set_retry(retry)
+
def load_external_module(self, funcname, func):
"""
This function can be used to add externally defined redis modules,
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 57aafbd..d5a38b2 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -26,6 +26,8 @@ from redis.asyncio.connection import (
)
from redis.asyncio.lock import Lock
from redis.asyncio.parser import CommandsParser
+from redis.asyncio.retry import Retry
+from redis.backoff import default_backoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
PIPELINE_BLOCKED_COMMANDS,
@@ -110,10 +112,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
:param startup_nodes:
| :class:`~.ClusterNode` to used as a startup node
:param require_full_coverage:
- | When set to ``False``: the client will not require a full coverage of the
- slots. However, if not all slots are covered, and at least one node has
- ``cluster-require-full-coverage`` set to ``yes``, the server will throw a
- :class:`~.ClusterDownError` for some key-based commands.
+ | When set to ``False``: the client will not require a full coverage of
+ the slots. However, if not all slots are covered, and at least one node
+ has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
+ a :class:`~.ClusterDownError` for some key-based commands.
| When set to ``True``: all slots must be covered to construct the cluster
client. If not all slots are covered, :class:`~.RedisClusterException` will be
thrown.
@@ -136,7 +138,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
:param connection_error_retry_attempts:
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
- or :class:`~.ConnectionError` are encountered
+ or :class:`~.ConnectionError` are encountered.
+ The default backoff strategy will be set if Retry object is not passed (see
+ default_backoff in backoff.py). To change it, pass a custom Retry object
+ using the "retry" keyword.
:param max_connections:
| Maximum number of connections per node. If there are no free connections & the
maximum number of connections are already created, a
@@ -214,9 +219,9 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
startup_nodes: Optional[List["ClusterNode"]] = None,
require_full_coverage: bool = True,
read_from_replicas: bool = False,
- reinitialize_steps: int = 10,
+ reinitialize_steps: int = 5,
cluster_error_retry_attempts: int = 3,
- connection_error_retry_attempts: int = 5,
+ connection_error_retry_attempts: int = 3,
max_connections: int = 2**31,
# Client related kwargs
db: Union[str, int] = 0,
@@ -235,6 +240,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
socket_keepalive: bool = False,
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
socket_timeout: Optional[float] = None,
+ retry: Optional["Retry"] = None,
+ retry_on_error: Optional[List[Exception]] = None,
# SSL related kwargs
ssl: bool = False,
ssl_ca_certs: Optional[str] = None,
@@ -282,6 +289,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
"socket_keepalive": socket_keepalive,
"socket_keepalive_options": socket_keepalive_options,
"socket_timeout": socket_timeout,
+ "retry": retry,
}
if ssl:
@@ -302,6 +310,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
# Call our on_connect function to configure READONLY mode
kwargs["redis_connect_func"] = self.on_connect
+ self.retry = retry
+ if retry or retry_on_error or connection_error_retry_attempts > 0:
+ # Set a retry object for all cluster nodes
+ self.retry = retry or Retry(
+ default_backoff(), connection_error_retry_attempts
+ )
+ if not retry_on_error:
+ # Default errors for retrying
+ retry_on_error = [ConnectionError, TimeoutError]
+ self.retry.update_supported_errors(retry_on_error)
+ kwargs.update({"retry": self.retry})
+
kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy()
self.connection_kwargs = kwargs
@@ -323,7 +343,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
self.reinitialize_steps = reinitialize_steps
self.cluster_error_retry_attempts = cluster_error_retry_attempts
self.connection_error_retry_attempts = connection_error_retry_attempts
-
self.reinitialize_counter = 0
self.commands_parser = CommandsParser()
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -481,6 +500,16 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
return self.connection_kwargs
+ def get_retry(self) -> Optional["Retry"]:
+ return self.retry
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.retry = retry
+ for node in self.get_nodes():
+ node.connection_kwargs.update({"retry": retry})
+ for conn in node._connections:
+ conn.retry = retry
+
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
"""Set a custom response callback."""
self.response_callbacks[command] = callback
@@ -618,9 +647,11 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
if passed_targets and not self._is_node_flag(passed_targets):
target_nodes = self._parse_target_nodes(passed_targets)
target_nodes_specified = True
- retry_attempts = 1
+ retry_attempts = 0
- for _ in range(retry_attempts):
+ # Add one for the first execution
+ execute_attempts = 1 + retry_attempts
+ for _ in range(execute_attempts):
if self._initialize:
await self.initialize()
try:
@@ -658,17 +689,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
)
return dict(zip(keys, values))
except Exception as e:
- if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
- # The nodes and slots cache were reinitialized.
+ if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
+ # The nodes and slots cache were should be reinitialized.
# Try again with the new cluster setup.
- exception = e
+ retry_attempts -= 1
+ continue
else:
- # All other errors should be raised.
- raise
-
- # If it fails the configured number of times then raise exception back
- # to caller of this method
- raise exception
+ # raise the exception
+ raise e
async def _execute_command(
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
@@ -676,7 +704,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
asking = moved = False
redirect_addr = None
ttl = self.RedisClusterRequestTTL
- connection_error_retry_counter = 0
while ttl > 0:
ttl -= 1
@@ -695,25 +722,18 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
moved = False
return await target_node.execute_command(*args, **kwargs)
- except BusyLoadingError:
+ except (BusyLoadingError, MaxConnectionsError):
+ raise
+ except (ConnectionError, TimeoutError):
+ # Connection retries are being handled in the node's
+ # Retry object.
+ # Remove the failed node from the startup nodes before we try
+ # to reinitialize the cluster
+ self.nodes_manager.startup_nodes.pop(target_node.name, None)
+ # Hard force of reinitialize of the node/slots setup
+ # and try again with the new setup
+ await self.close()
raise
- except (ConnectionError, TimeoutError) as e:
- # Give the node 0.25 seconds to get back up and retry again with the
- # same node and configuration. After the defined number of attempts, try
- # to reinitialize the cluster and try again.
- connection_error_retry_counter += 1
- if (
- connection_error_retry_counter
- < self.connection_error_retry_attempts
- ):
- await asyncio.sleep(0.25)
- else:
- if isinstance(e, MaxConnectionsError):
- raise
- # Hard force of reinitialize of the node/slots setup
- # and try again with the new setup
- await self.close()
- raise
except ClusterDownError:
# ClusterDownError can occur during a failover and to get
# self-healed, we will try to reinitialize the cluster layout
@@ -1145,26 +1165,11 @@ class NodesManager:
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
- except (ConnectionError, TimeoutError) as e:
+ except Exception as e:
+ # Try the next startup node.
+ # The exception is saved and raised only if we have no more nodes.
exception = e
continue
- except ResponseError as e:
- # Isn't a cluster connection, so it won't parse these
- # exceptions automatically
- message = e.__str__()
- if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
- continue
- else:
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server: {startup_node}. error: {message}"
- )
- except Exception as e:
- message = e.__str__()
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server {startup_node.name}. error: {message}"
- )
# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
@@ -1245,8 +1250,8 @@ class NodesManager:
if not startup_nodes_reachable:
raise RedisClusterException(
- "Redis Cluster cannot be connected. Please provide at least "
- "one reachable node. "
+ f"Redis Cluster cannot be connected. Please provide at least "
+ f"one reachable node: {str(exception)}"
) from exception
# Check if the slots are not fully covered
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index df066c4..4f19153 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -497,7 +497,7 @@ class Connection:
retry_on_error.append(socket.timeout)
retry_on_error.append(asyncio.TimeoutError)
self.retry_on_error = retry_on_error
- if retry_on_error:
+ if retry or retry_on_error:
if not retry:
self.retry = Retry(NoBackoff(), 1)
else:
@@ -1445,6 +1445,12 @@ class ConnectionPool:
if exc:
raise exc
+ def set_retry(self, retry: "Retry") -> None:
+ for conn in self._available_connections:
+ conn.retry = retry
+ for conn in self._in_use_connections:
+ conn.retry = retry
+
class BlockingConnectionPool(ConnectionPool):
"""
diff --git a/redis/backoff.py b/redis/backoff.py
index 5ccdb91..c62e760 100644
--- a/redis/backoff.py
+++ b/redis/backoff.py
@@ -1,6 +1,11 @@
import random
from abc import ABC, abstractmethod
+# Maximum backoff between each retry in seconds
+DEFAULT_CAP = 0.512
+# Minimum backoff between each retry in seconds
+DEFAULT_BASE = 0.008
+
class AbstractBackoff(ABC):
"""Backoff interface"""
@@ -40,7 +45,7 @@ class NoBackoff(ConstantBackoff):
class ExponentialBackoff(AbstractBackoff):
"""Exponential backoff upon failure"""
- def __init__(self, cap, base):
+ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
@@ -55,7 +60,7 @@ class ExponentialBackoff(AbstractBackoff):
class FullJitterBackoff(AbstractBackoff):
"""Full jitter backoff upon failure"""
- def __init__(self, cap, base):
+ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
@@ -70,7 +75,7 @@ class FullJitterBackoff(AbstractBackoff):
class EqualJitterBackoff(AbstractBackoff):
"""Equal jitter backoff upon failure"""
- def __init__(self, cap, base):
+ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
@@ -86,7 +91,7 @@ class EqualJitterBackoff(AbstractBackoff):
class DecorrelatedJitterBackoff(AbstractBackoff):
"""Decorrelated jitter backoff upon failure"""
- def __init__(self, cap, base):
+ def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE):
"""
`cap`: maximum backoff time in seconds
`base`: base backoff time in seconds
@@ -103,3 +108,7 @@ class DecorrelatedJitterBackoff(AbstractBackoff):
temp = random.uniform(self._base, max_backoff)
self._previous_backoff = min(self._cap, temp)
return self._previous_backoff
+
+
+def default_backoff():
+ return EqualJitterBackoff()
diff --git a/redis/client.py b/redis/client.py
index 8356ba7..ed857c8 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -26,6 +26,7 @@ from redis.exceptions import (
WatchError,
)
from redis.lock import Lock
+from redis.retry import Retry
from redis.utils import safe_str, str_if_bytes
SYM_EMPTY = b""
@@ -1047,6 +1048,13 @@ class Redis(AbstractRedis, RedisModuleCommands, CoreCommands, SentinelCommands):
"""Get the connection's key-word arguments"""
return self.connection_pool.connection_kwargs
+ def get_retry(self) -> Optional["Retry"]:
+ return self.get_connection_kwargs().get("retry")
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.get_connection_kwargs().update({"retry": retry})
+ self.connection_pool.set_retry(retry)
+
def set_response_callback(self, command, callback):
"""Set a custom Response Callback"""
self.response_callbacks[command] = callback
diff --git a/redis/cluster.py b/redis/cluster.py
index 027fe40..91deaea 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -1,12 +1,12 @@
-import copy
import random
import socket
import sys
import threading
import time
from collections import OrderedDict
-from typing import Any, Callable, Dict, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+from redis.backoff import default_backoff
from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan
from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
@@ -29,6 +29,7 @@ from redis.exceptions import (
TryAgainError,
)
from redis.lock import Lock
+from redis.retry import Retry
from redis.utils import (
dict_merge,
list_keys_to_dict,
@@ -426,27 +427,28 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
def __init__(
self,
- host=None,
- port=6379,
- startup_nodes=None,
- cluster_error_retry_attempts=3,
- require_full_coverage=False,
- reinitialize_steps=10,
- read_from_replicas=False,
- dynamic_startup_nodes=True,
- url=None,
+ host: Optional[str] = None,
+ port: int = 6379,
+ startup_nodes: Optional[List["ClusterNode"]] = None,
+ cluster_error_retry_attempts: int = 3,
+ retry: Optional["Retry"] = None,
+ require_full_coverage: bool = False,
+ reinitialize_steps: int = 5,
+ read_from_replicas: bool = False,
+ dynamic_startup_nodes: bool = True,
+ url: Optional[str] = None,
**kwargs,
):
"""
Initialize a new RedisCluster client.
- :startup_nodes: 'list[ClusterNode]'
+ :param startup_nodes:
List of nodes from which initial bootstrapping can be done
- :host: 'str'
+ :param host:
Can be used to point to a startup node
- :port: 'int'
+ :param port:
Can be used to point to a startup node
- :require_full_coverage: 'bool'
+ :param require_full_coverage:
When set to False (default value): the client will not require a
full coverage of the slots. However, if not all slots are covered,
and at least one node has 'cluster-require-full-coverage' set to
@@ -456,12 +458,12 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
When set to True: all slots must be covered to construct the
cluster client. If not all slots are covered, RedisClusterException
will be thrown.
- :read_from_replicas: 'bool'
+ :param read_from_replicas:
Enable read from replicas in READONLY mode. You can read possibly
stale data.
When set to true, read commands will be assigned between the
primary and its replications in a Round-Robin manner.
- :dynamic_startup_nodes: 'bool'
+ :param dynamic_startup_nodes:
Set the RedisCluster's startup nodes to all of the discovered nodes.
If true (default value), the cluster's discovered nodes will be used to
determine the cluster nodes-slots mapping in the next topology refresh.
@@ -469,10 +471,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
listed in the CLUSTER SLOTS output.
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
specific IP addresses, it is best to set it to false.
- :cluster_error_retry_attempts: 'int'
- Retry command execution attempts when encountering ClusterDownError
- or ConnectionError
- :reinitialize_steps: 'int'
+ :param cluster_error_retry_attempts:
+ Number of times to retry before raising an error when
+ :class:`~.TimeoutError` or :class:`~.ConnectionError` or
+ :class:`~.ClusterDownError` are encountered
+ :param reinitialize_steps:
Specifies the number of MOVED errors that need to occur before
reinitializing the whole cluster topology. If a MOVED error occurs
and the cluster does not need to be reinitialized on this current
@@ -540,6 +543,11 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
kwargs.update({"redis_connect_func": self.on_connect})
kwargs = cleanup_kwargs(**kwargs)
+ if retry:
+ self.retry = retry
+ kwargs.update({"retry": self.retry})
+ else:
+ kwargs.update({"retry": Retry(default_backoff(), 0)})
self.encoder = Encoder(
kwargs.get("encoding", "utf-8"),
@@ -666,6 +674,14 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
self.nodes_manager.default_node = node
return True
+ def get_retry(self) -> Optional["Retry"]:
+ return self.retry
+
+ def set_retry(self, retry: "Retry") -> None:
+ self.retry = retry
+ for node in self.get_nodes():
+ node.redis_connection.set_retry(retry)
+
def monitor(self, target_node=None):
"""
Returns a Monitor object for the specified target node.
@@ -986,12 +1002,13 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
# nodes were passed to this function, we cannot retry the command
# execution since the nodes may not be valid anymore after the tables
# were reinitialized. So in case of passed target nodes,
- # retry_attempts will be set to 1.
+ # retry_attempts will be set to 0.
retry_attempts = (
- 1 if target_nodes_specified else self.cluster_error_retry_attempts
+ 0 if target_nodes_specified else self.cluster_error_retry_attempts
)
- exception = None
- for _ in range(0, retry_attempts):
+ # Add one for the first execution
+ execute_attempts = 1 + retry_attempts
+ for _ in range(execute_attempts):
try:
res = {}
if not target_nodes_specified:
@@ -1008,18 +1025,15 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except Exception as e:
- if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
+ if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
- exception = e
+ retry_attempts -= 1
+ continue
else:
- # All other errors should be raised.
+ # raise the exception
raise e
- # If it fails the configured number of times then raise exception back
- # to caller of this method
- raise exception
-
def _execute_command(self, target_node, *args, **kwargs):
"""
Send a command to a node in the cluster
@@ -1031,7 +1045,6 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
asking = False
moved = False
ttl = int(self.RedisClusterRequestTTL)
- connection_error_retry_counter = 0
while ttl > 0:
ttl -= 1
@@ -1064,25 +1077,21 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
except AuthenticationError:
raise
except (ConnectionError, TimeoutError) as e:
+ # Connection retries are being handled in the node's
+ # Retry object.
# ConnectionError can also be raised if we couldn't get a
# connection from the pool before timing out, so check that
# this is an actual connection before attempting to disconnect.
if connection is not None:
connection.disconnect()
- connection_error_retry_counter += 1
-
- # Give the node 0.25 seconds to get back up and retry again
- # with same node and configuration. After 5 attempts then try
- # to reinitialize the cluster and see if the nodes
- # configuration has changed or not
- if connection_error_retry_counter < 5:
- time.sleep(0.25)
- else:
- # Hard force of reinitialize of the node/slots setup
- # and try again with the new setup
- target_node.redis_connection = None
- self.nodes_manager.initialize()
- raise e
+
+ # Remove the failed node from the startup nodes before we try
+ # to reinitialize the cluster
+ self.nodes_manager.startup_nodes.pop(target_node.name, None)
+ # Reset the cluster node's connection
+ target_node.redis_connection = None
+ self.nodes_manager.initialize()
+ raise e
except MovedError as e:
# First, we will try to patch the slots/nodes cache with the
# redirected node output and try again. If MovedError exceeds
@@ -1406,17 +1415,15 @@ class NodesManager:
startup_nodes_reachable = False
fully_covered = False
kwargs = self.connection_kwargs
+ exception = None
for startup_node in self.startup_nodes.values():
try:
if startup_node.redis_connection:
r = startup_node.redis_connection
else:
- # Create a new Redis connection and let Redis decode the
- # responses so we won't need to handle that
- copy_kwargs = copy.deepcopy(kwargs)
- copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"})
+ # Create a new Redis connection
r = self.create_redis_node(
- startup_node.host, startup_node.port, **copy_kwargs
+ startup_node.host, startup_node.port, **kwargs
)
self.startup_nodes[startup_node.name].redis_connection = r
# Make sure cluster mode is enabled on this node
@@ -1426,25 +1433,11 @@ class NodesManager:
)
cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
startup_nodes_reachable = True
- except (ConnectionError, TimeoutError):
- continue
- except ResponseError as e:
- # Isn't a cluster connection, so it won't parse these
- # exceptions automatically
- message = e.__str__()
- if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
- continue
- else:
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server: {startup_node}. error: {message}"
- )
except Exception as e:
- message = e.__str__()
- raise RedisClusterException(
- 'ERROR sending "cluster slots" command to redis '
- f"server {startup_node.name}. error: {message}"
- )
+ # Try the next startup node.
+ # The exception is saved and raised only if we have no more nodes.
+ exception = e
+ continue
# CLUSTER SLOTS command results in the following output:
# [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
@@ -1514,9 +1507,9 @@ class NodesManager:
if not startup_nodes_reachable:
raise RedisClusterException(
- "Redis Cluster cannot be connected. Please provide at least "
- "one reachable node. "
- )
+ f"Redis Cluster cannot be connected. Please provide at least "
+ f"one reachable node: {str(exception)}"
+ ) from exception
# Create Redis connections to all nodes
self.create_redis_connections(list(tmp_nodes_cache.values()))
@@ -1699,14 +1692,14 @@ class ClusterPipeline(RedisCluster):
def __init__(
self,
- nodes_manager,
- commands_parser,
- result_callbacks=None,
- cluster_response_callbacks=None,
- startup_nodes=None,
- read_from_replicas=False,
- cluster_error_retry_attempts=5,
- reinitialize_steps=10,
+ nodes_manager: "NodesManager",
+ commands_parser: "CommandsParser",
+ result_callbacks: Optional[Dict[str, Callable]] = None,
+ cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
+ startup_nodes: Optional[List["ClusterNode"]] = None,
+ read_from_replicas: bool = False,
+ cluster_error_retry_attempts: int = 3,
+ reinitialize_steps: int = 5,
lock=None,
**kwargs,
):
@@ -1858,22 +1851,22 @@ class ClusterPipeline(RedisCluster):
"""
if not stack:
return []
-
- for _ in range(0, self.cluster_error_retry_attempts):
+ retry_attempts = self.cluster_error_retry_attempts
+ while True:
try:
return self._send_cluster_commands(
stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
)
- except ClusterDownError:
- # Try again with the new cluster setup. All other errors
- # should be raised.
- pass
-
- # If it fails the configured number of times then raise
- # exception back to caller of this method
- raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
+ except (ClusterDownError, ConnectionError) as e:
+ if retry_attempts > 0:
+ # Try again with the new cluster setup. All other errors
+ # should be raised.
+ retry_attempts -= 1
+ pass
+ else:
+ raise e
def _send_cluster_commands(
self, stack, raise_on_error=True, allow_redirections=True
@@ -1898,7 +1891,6 @@ class ClusterPipeline(RedisCluster):
# we figure out the slot number that command maps to, then from
# the slot determine the node.
for c in attempt:
- connection_error_retry_counter = 0
while True:
# refer to our internal node -> slot table that
# tells us where a given command should route to.
@@ -1931,13 +1923,10 @@ class ClusterPipeline(RedisCluster):
try:
connection = get_connection(redis_node, c.args)
except ConnectionError:
- connection_error_retry_counter += 1
- if connection_error_retry_counter < 5:
- # reinitialize the node -> slot table
- self.nodes_manager.initialize()
- continue
- else:
- raise
+ # Connection retries are being handled in the node's
+ # Retry object. Reinitialize the node -> slot table.
+ self.nodes_manager.initialize()
+ raise
nodes[node_name] = NodeCommands(
redis_node.parse_response,
redis_node.connection_pool,
diff --git a/redis/connection.py b/redis/connection.py
index a2b0074..9c5b536 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -540,7 +540,7 @@ class Connection:
# Add TimeoutError to the errors list to retry on
retry_on_error.append(TimeoutError)
self.retry_on_error = retry_on_error
- if retry_on_error:
+ if retry or retry_on_error:
if retry is None:
self.retry = Retry(NoBackoff(), 1)
else:
@@ -1467,6 +1467,13 @@ class ConnectionPool:
for connection in connections:
connection.disconnect()
+ def set_retry(self, retry: "Retry") -> None:
+ self.connection_kwargs.update({"retry": retry})
+ for conn in self._available_connections:
+ conn.retry = retry
+ for conn in self._in_use_connections:
+ conn.retry = retry
+
class BlockingConnectionPool(ConnectionPool):
"""
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py
index 27f1190..38bcaf6 100644
--- a/tests/test_asyncio/test_cluster.py
+++ b/tests/test_asyncio/test_cluster.py
@@ -13,6 +13,8 @@ from _pytest.fixtures import FixtureRequest
from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster
from redis.asyncio.connection import Connection, SSLConnection
from redis.asyncio.parser import CommandsParser
+from redis.asyncio.retry import Retry
+from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
@@ -247,6 +249,76 @@ class TestRedisClusterObj:
]
)
+ async def test_cluster_set_get_retry_object(self, request: FixtureRequest):
+ retry = Retry(NoBackoff(), 2)
+ url = request.config.getoption("--redis-url")
+ async with RedisCluster.from_url(url, retry=retry) as r:
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ for node in r.get_nodes():
+ n_retry = node.connection_kwargs.get("retry")
+ assert n_retry is not None
+ assert n_retry._retries == retry._retries
+ assert isinstance(n_retry._backoff, NoBackoff)
+ rand_cluster_node = r.get_random_node()
+ existing_conn = rand_cluster_node.acquire_connection()
+ # Change retry policy
+ new_retry = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry)
+ assert r.get_retry()._retries == new_retry._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ for node in r.get_nodes():
+ n_retry = node.connection_kwargs.get("retry")
+ assert n_retry is not None
+ assert n_retry._retries == new_retry._retries
+ assert isinstance(n_retry._backoff, ExponentialBackoff)
+ assert existing_conn.retry._retries == new_retry._retries
+ new_conn = rand_cluster_node.acquire_connection()
+ assert new_conn.retry._retries == new_retry._retries
+
+ async def test_cluster_retry_object(self, request: FixtureRequest) -> None:
+ url = request.config.getoption("--redis-url")
+ async with RedisCluster.from_url(url) as rc_default:
+ # Test default retry
+ retry = rc_default.connection_kwargs.get("retry")
+ assert isinstance(retry, Retry)
+ assert retry._retries == 3
+ assert isinstance(retry._backoff, type(default_backoff()))
+ assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry")
+
+ retry = Retry(ExponentialBackoff(10, 5), 5)
+ async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry:
+ # Test custom retry
+ assert (
+ rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ )
+ == retry
+ )
+
+ async with RedisCluster.from_url(
+ url, connection_error_retry_attempts=0
+ ) as rc_no_retries:
+ # Test no connection retries
+ assert (
+ rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ )
+ is None
+ )
+
+ async with RedisCluster.from_url(
+ url, retry=Retry(NoBackoff(), 0)
+ ) as rc_no_retries:
+ assert (
+ rc_no_retries.get_node("127.0.0.1", 16379)
+ .connection_kwargs.get("retry")
+ ._retries
+ == 0
+ )
+
async def test_empty_startup_nodes(self) -> None:
"""
Test that exception is raised when empty providing empty startup_nodes
@@ -1289,8 +1361,11 @@ class TestClusterRedisCommands:
assert "addr" in info
@skip_if_server_version_lt("2.6.9")
- async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None:
+ async def test_client_kill(
+ self, r: RedisCluster, create_redis: Callable[..., RedisCluster]
+ ) -> None:
node = r.get_primaries()[0]
+ r2 = await create_redis(cls=RedisCluster, flushdb=False)
await r.client_setname("redis-py-c1", target_nodes="all")
await r2.client_setname("redis-py-c2", target_nodes="all")
clients = [
@@ -1311,6 +1386,7 @@ class TestClusterRedisCommands:
]
assert len(clients) == 1
assert clients[0].get("name") == "redis-py-c1"
+ await r2.close()
@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None:
diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py
index 38e353b..86e6ddf 100644
--- a/tests/test_asyncio/test_retry.py
+++ b/tests/test_asyncio/test_retry.py
@@ -1,8 +1,9 @@
import pytest
+from redis.asyncio import Redis
from redis.asyncio.connection import Connection, UnixDomainSocketConnection
from redis.asyncio.retry import Retry
-from redis.backoff import AbstractBackoff, NoBackoff
+from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff
from redis.exceptions import ConnectionError, TimeoutError
@@ -114,3 +115,22 @@ class TestRetry:
assert self.actual_attempts == 5
assert self.actual_failures == 5
+
+
+class TestRedisClientRetry:
+ "Test the Redis client behavior with retries"
+
+ async def test_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ url = request.config.getoption("--redis-url")
+ r = await Redis.from_url(url, retry_on_timeout=True, retry=retry)
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ new_retry_policy = Retry(ExponentialBackoff(), 3)
+ exiting_conn = await r.connection_pool.get_connection("_")
+ r.set_retry(new_retry_policy)
+ assert r.get_retry()._retries == new_retry_policy._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ assert exiting_conn.retry._retries == new_retry_policy._retries
+ new_conn = await r.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry_policy._retries
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 5652673..d18fbbb 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -7,6 +7,7 @@ from unittest.mock import DEFAULT, Mock, call, patch
import pytest
from redis import Redis
+from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
from redis.cluster import (
PRIMARY,
REDIS_CLUSTER_HASH_SLOTS,
@@ -31,6 +32,7 @@ from redis.exceptions import (
ResponseError,
TimeoutError,
)
+from redis.retry import Retry
from redis.utils import str_if_bytes
from tests.test_pubsub import wait_for_message
@@ -358,6 +360,60 @@ class TestRedisClusterObj:
assert r.execute_command("SET", "foo", "bar") == "MOCK_OK"
+ def test_handling_cluster_failover_to_a_replica(self, r):
+ # Set the key we'll test for
+ key = "key"
+ r.set("key", "value")
+ primary = r.get_node_from_key(key, replica=False)
+ assert str_if_bytes(r.get("key")) == "value"
+ # Get the current output of cluster slots
+ cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS")
+ replica_host = ""
+ replica_port = 0
+ # Replace one of the replicas to be the new primary based on the
+ # cluster slots output
+ for slot_range in cluster_slots:
+ primary_port = slot_range[2][1]
+ if primary_port == primary.port:
+ if len(slot_range) <= 3:
+ # cluster doesn't have a replica, return
+ return
+ replica_host = str_if_bytes(slot_range[3][0])
+ replica_port = slot_range[3][1]
+ # replace replica and primary in the cluster slots output
+ tmp_node = slot_range[2]
+ slot_range[2] = slot_range[3]
+ slot_range[3] = tmp_node
+ break
+
+ def raise_connection_error():
+ raise ConnectionError("error")
+
+ def mock_execute_command(*_args, **_kwargs):
+ if _args[0] == "CLUSTER SLOTS":
+ return cluster_slots
+ else:
+ raise Exception("Failed to mock cluster slots")
+
+ # Mock connection error for the current primary
+ mock_node_resp_func(primary, raise_connection_error)
+ primary.redis_connection.set_retry(Retry(NoBackoff(), 1))
+
+ # Mock the cluster slots response for all other nodes
+ redis_mock_node = Mock()
+ redis_mock_node.execute_command.side_effect = mock_execute_command
+ # Mock response value for all other commands
+ redis_mock_node.parse_response.return_value = "MOCK_OK"
+ for node in r.get_nodes():
+ if node.port != primary.port:
+ node.redis_connection = redis_mock_node
+
+ assert r.get(key) == "MOCK_OK"
+ new_primary = r.get_node_from_key(key, replica=False)
+ assert new_primary.host == replica_host
+ assert new_primary.port == replica_port
+ assert r.get_node(primary.host, primary.port).server_type == REPLICA
+
def test_moved_redirection(self, request):
"""
Test that the client handles MOVED response.
@@ -691,6 +747,50 @@ class TestRedisClusterObj:
cur_node = r.get_node(node_name=node_name)
assert conn == r.get_redis_connection(cur_node)
+ def test_cluster_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ r = _get_client(RedisCluster, request, retry=retry)
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ for node in r.get_nodes():
+ assert node.redis_connection.get_retry()._retries == retry._retries
+ assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff)
+ rand_node = r.get_random_node()
+ existing_conn = rand_node.redis_connection.connection_pool.get_connection("_")
+ # Change retry policy
+ new_retry = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry)
+ assert r.get_retry()._retries == new_retry._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ for node in r.get_nodes():
+ assert node.redis_connection.get_retry()._retries == new_retry._retries
+ assert isinstance(
+ node.redis_connection.get_retry()._backoff, ExponentialBackoff
+ )
+ assert existing_conn.retry._retries == new_retry._retries
+ new_conn = rand_node.redis_connection.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry._retries
+
+ def test_cluster_retry_object(self, r) -> None:
+ # Test default retry
+ retry = r.get_connection_kwargs().get("retry")
+ assert isinstance(retry, Retry)
+ assert retry._retries == 0
+ assert isinstance(retry._backoff, type(default_backoff()))
+ node1 = r.get_node("127.0.0.1", 16379).redis_connection
+ node2 = r.get_node("127.0.0.1", 16380).redis_connection
+ assert node1.get_retry()._retries == node2.get_retry()._retries
+
+ # Test custom retry
+ retry = Retry(ExponentialBackoff(10, 5), 5)
+ rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry)
+ assert (
+ rc_custom_retry.get_node("127.0.0.1", 16379)
+ .redis_connection.get_retry()
+ ._retries
+ == retry._retries
+ )
+
@pytest.mark.onlycluster
class TestClusterRedisCommands:
diff --git a/tests/test_retry.py b/tests/test_retry.py
index f844fd0..3cfea5c 100644
--- a/tests/test_retry.py
+++ b/tests/test_retry.py
@@ -2,7 +2,7 @@ from unittest.mock import patch
import pytest
-from redis.backoff import NoBackoff
+from redis.backoff import ExponentialBackoff, NoBackoff
from redis.client import Redis
from redis.connection import Connection, UnixDomainSocketConnection
from redis.exceptions import (
@@ -203,3 +203,17 @@ class TestRedisClientRetry:
r.get("foo")
finally:
assert parse_response.call_count == retries + 1
+
+ def test_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ r = _get_client(Redis, request, retry_on_timeout=True, retry=retry)
+ exist_conn = r.connection_pool.get_connection("_")
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ new_retry_policy = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry_policy)
+ assert r.get_retry()._retries == new_retry_policy._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ assert exist_conn.retry._retries == new_retry_policy._retries
+ new_conn = r.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry_policy._retries