summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-11-10 13:16:49 +0200
committerGitHub <noreply@github.com>2022-11-10 13:16:49 +0200
commit67214cc3eaa7890c87e45550b8320779f954094b (patch)
tree3bca8b8913224255bdf72de79265ca0441cecb1c /tests
parentbb06ccd52924800ac501d17c8a42038c8e5c5770 (diff)
downloadredis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements * Fixed linters * Type fixes * Added to CHANGES * Added getter and setter for the client's retry object and added more tests * Fixed linters * Fixed test * Fixed test_client_kill test * Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries * Fixing linters * Reverting deletion of connection_error_retry_attempts to maintain backward compatibility * Updating retry object for existing and new connections * Changed the default value of reinitialize_steps from 10 to 5 * fix review comments Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: dvora-h <dvora.heller@redis.com> Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/test_asyncio/test_cluster.py78
-rw-r--r--tests/test_asyncio/test_retry.py22
-rw-r--r--tests/test_cluster.py100
-rw-r--r--tests/test_retry.py16
4 files changed, 213 insertions, 3 deletions
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py
index 27f1190..38bcaf6 100644
--- a/tests/test_asyncio/test_cluster.py
+++ b/tests/test_asyncio/test_cluster.py
@@ -13,6 +13,8 @@ from _pytest.fixtures import FixtureRequest
from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster
from redis.asyncio.connection import Connection, SSLConnection
from redis.asyncio.parser import CommandsParser
+from redis.asyncio.retry import Retry
+from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
@@ -247,6 +249,76 @@ class TestRedisClusterObj:
]
)
+ async def test_cluster_set_get_retry_object(self, request: FixtureRequest):
+ retry = Retry(NoBackoff(), 2)
+ url = request.config.getoption("--redis-url")
+ async with RedisCluster.from_url(url, retry=retry) as r:
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ for node in r.get_nodes():
+ n_retry = node.connection_kwargs.get("retry")
+ assert n_retry is not None
+ assert n_retry._retries == retry._retries
+ assert isinstance(n_retry._backoff, NoBackoff)
+ rand_cluster_node = r.get_random_node()
+ existing_conn = rand_cluster_node.acquire_connection()
+ # Change retry policy
+ new_retry = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry)
+ assert r.get_retry()._retries == new_retry._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ for node in r.get_nodes():
+ n_retry = node.connection_kwargs.get("retry")
+ assert n_retry is not None
+ assert n_retry._retries == new_retry._retries
+ assert isinstance(n_retry._backoff, ExponentialBackoff)
+ assert existing_conn.retry._retries == new_retry._retries
+ new_conn = rand_cluster_node.acquire_connection()
+ assert new_conn.retry._retries == new_retry._retries
+
+ async def test_cluster_retry_object(self, request: FixtureRequest) -> None:
+ url = request.config.getoption("--redis-url")
+ async with RedisCluster.from_url(url) as rc_default:
+ # Test default retry
+ retry = rc_default.connection_kwargs.get("retry")
+ assert isinstance(retry, Retry)
+ assert retry._retries == 3
+ assert isinstance(retry._backoff, type(default_backoff()))
+ assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry")
+
+ retry = Retry(ExponentialBackoff(10, 5), 5)
+ async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry:
+ # Test custom retry
+ assert (
+ rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ )
+ == retry
+ )
+
+ async with RedisCluster.from_url(
+ url, connection_error_retry_attempts=0
+ ) as rc_no_retries:
+ # Test no connection retries
+ assert (
+ rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get(
+ "retry"
+ )
+ is None
+ )
+
+ async with RedisCluster.from_url(
+ url, retry=Retry(NoBackoff(), 0)
+ ) as rc_no_retries:
+ assert (
+ rc_no_retries.get_node("127.0.0.1", 16379)
+ .connection_kwargs.get("retry")
+ ._retries
+ == 0
+ )
+
async def test_empty_startup_nodes(self) -> None:
"""
Test that exception is raised when empty providing empty startup_nodes
@@ -1289,8 +1361,11 @@ class TestClusterRedisCommands:
assert "addr" in info
@skip_if_server_version_lt("2.6.9")
- async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None:
+ async def test_client_kill(
+ self, r: RedisCluster, create_redis: Callable[..., RedisCluster]
+ ) -> None:
node = r.get_primaries()[0]
+ r2 = await create_redis(cls=RedisCluster, flushdb=False)
await r.client_setname("redis-py-c1", target_nodes="all")
await r2.client_setname("redis-py-c2", target_nodes="all")
clients = [
@@ -1311,6 +1386,7 @@ class TestClusterRedisCommands:
]
assert len(clients) == 1
assert clients[0].get("name") == "redis-py-c1"
+ await r2.close()
@skip_if_server_version_lt("2.6.0")
async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None:
diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py
index 38e353b..86e6ddf 100644
--- a/tests/test_asyncio/test_retry.py
+++ b/tests/test_asyncio/test_retry.py
@@ -1,8 +1,9 @@
import pytest
+from redis.asyncio import Redis
from redis.asyncio.connection import Connection, UnixDomainSocketConnection
from redis.asyncio.retry import Retry
-from redis.backoff import AbstractBackoff, NoBackoff
+from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff
from redis.exceptions import ConnectionError, TimeoutError
@@ -114,3 +115,22 @@ class TestRetry:
assert self.actual_attempts == 5
assert self.actual_failures == 5
+
+
+class TestRedisClientRetry:
+ "Test the Redis client behavior with retries"
+
+ async def test_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ url = request.config.getoption("--redis-url")
+ r = await Redis.from_url(url, retry_on_timeout=True, retry=retry)
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ new_retry_policy = Retry(ExponentialBackoff(), 3)
+ exiting_conn = await r.connection_pool.get_connection("_")
+ r.set_retry(new_retry_policy)
+ assert r.get_retry()._retries == new_retry_policy._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ assert exiting_conn.retry._retries == new_retry_policy._retries
+ new_conn = await r.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry_policy._retries
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 5652673..d18fbbb 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -7,6 +7,7 @@ from unittest.mock import DEFAULT, Mock, call, patch
import pytest
from redis import Redis
+from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
from redis.cluster import (
PRIMARY,
REDIS_CLUSTER_HASH_SLOTS,
@@ -31,6 +32,7 @@ from redis.exceptions import (
ResponseError,
TimeoutError,
)
+from redis.retry import Retry
from redis.utils import str_if_bytes
from tests.test_pubsub import wait_for_message
@@ -358,6 +360,60 @@ class TestRedisClusterObj:
assert r.execute_command("SET", "foo", "bar") == "MOCK_OK"
+ def test_handling_cluster_failover_to_a_replica(self, r):
+ # Set the key we'll test for
+ key = "key"
+ r.set("key", "value")
+ primary = r.get_node_from_key(key, replica=False)
+ assert str_if_bytes(r.get("key")) == "value"
+ # Get the current output of cluster slots
+ cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS")
+ replica_host = ""
+ replica_port = 0
+ # Replace one of the replicas to be the new primary based on the
+ # cluster slots output
+ for slot_range in cluster_slots:
+ primary_port = slot_range[2][1]
+ if primary_port == primary.port:
+ if len(slot_range) <= 3:
+ # cluster doesn't have a replica, return
+ return
+ replica_host = str_if_bytes(slot_range[3][0])
+ replica_port = slot_range[3][1]
+ # replace replica and primary in the cluster slots output
+ tmp_node = slot_range[2]
+ slot_range[2] = slot_range[3]
+ slot_range[3] = tmp_node
+ break
+
+ def raise_connection_error():
+ raise ConnectionError("error")
+
+ def mock_execute_command(*_args, **_kwargs):
+ if _args[0] == "CLUSTER SLOTS":
+ return cluster_slots
+ else:
+ raise Exception("Failed to mock cluster slots")
+
+ # Mock connection error for the current primary
+ mock_node_resp_func(primary, raise_connection_error)
+ primary.redis_connection.set_retry(Retry(NoBackoff(), 1))
+
+ # Mock the cluster slots response for all other nodes
+ redis_mock_node = Mock()
+ redis_mock_node.execute_command.side_effect = mock_execute_command
+ # Mock response value for all other commands
+ redis_mock_node.parse_response.return_value = "MOCK_OK"
+ for node in r.get_nodes():
+ if node.port != primary.port:
+ node.redis_connection = redis_mock_node
+
+ assert r.get(key) == "MOCK_OK"
+ new_primary = r.get_node_from_key(key, replica=False)
+ assert new_primary.host == replica_host
+ assert new_primary.port == replica_port
+ assert r.get_node(primary.host, primary.port).server_type == REPLICA
+
def test_moved_redirection(self, request):
"""
Test that the client handles MOVED response.
@@ -691,6 +747,50 @@ class TestRedisClusterObj:
cur_node = r.get_node(node_name=node_name)
assert conn == r.get_redis_connection(cur_node)
+ def test_cluster_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ r = _get_client(RedisCluster, request, retry=retry)
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ for node in r.get_nodes():
+ assert node.redis_connection.get_retry()._retries == retry._retries
+ assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff)
+ rand_node = r.get_random_node()
+ existing_conn = rand_node.redis_connection.connection_pool.get_connection("_")
+ # Change retry policy
+ new_retry = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry)
+ assert r.get_retry()._retries == new_retry._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ for node in r.get_nodes():
+ assert node.redis_connection.get_retry()._retries == new_retry._retries
+ assert isinstance(
+ node.redis_connection.get_retry()._backoff, ExponentialBackoff
+ )
+ assert existing_conn.retry._retries == new_retry._retries
+ new_conn = rand_node.redis_connection.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry._retries
+
+ def test_cluster_retry_object(self, r) -> None:
+ # Test default retry
+ retry = r.get_connection_kwargs().get("retry")
+ assert isinstance(retry, Retry)
+ assert retry._retries == 0
+ assert isinstance(retry._backoff, type(default_backoff()))
+ node1 = r.get_node("127.0.0.1", 16379).redis_connection
+ node2 = r.get_node("127.0.0.1", 16380).redis_connection
+ assert node1.get_retry()._retries == node2.get_retry()._retries
+
+ # Test custom retry
+ retry = Retry(ExponentialBackoff(10, 5), 5)
+ rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry)
+ assert (
+ rc_custom_retry.get_node("127.0.0.1", 16379)
+ .redis_connection.get_retry()
+ ._retries
+ == retry._retries
+ )
+
@pytest.mark.onlycluster
class TestClusterRedisCommands:
diff --git a/tests/test_retry.py b/tests/test_retry.py
index f844fd0..3cfea5c 100644
--- a/tests/test_retry.py
+++ b/tests/test_retry.py
@@ -2,7 +2,7 @@ from unittest.mock import patch
import pytest
-from redis.backoff import NoBackoff
+from redis.backoff import ExponentialBackoff, NoBackoff
from redis.client import Redis
from redis.connection import Connection, UnixDomainSocketConnection
from redis.exceptions import (
@@ -203,3 +203,17 @@ class TestRedisClientRetry:
r.get("foo")
finally:
assert parse_response.call_count == retries + 1
+
+ def test_get_set_retry_object(self, request):
+ retry = Retry(NoBackoff(), 2)
+ r = _get_client(Redis, request, retry_on_timeout=True, retry=retry)
+ exist_conn = r.connection_pool.get_connection("_")
+ assert r.get_retry()._retries == retry._retries
+ assert isinstance(r.get_retry()._backoff, NoBackoff)
+ new_retry_policy = Retry(ExponentialBackoff(), 3)
+ r.set_retry(new_retry_policy)
+ assert r.get_retry()._retries == new_retry_policy._retries
+ assert isinstance(r.get_retry()._backoff, ExponentialBackoff)
+ assert exist_conn.retry._retries == new_retry_policy._retries
+ new_conn = r.connection_pool.get_connection("_")
+ assert new_conn.retry._retries == new_retry_policy._retries