diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2022-11-10 13:16:49 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-10 13:16:49 +0200 |
commit | 67214cc3eaa7890c87e45550b8320779f954094b (patch) | |
tree | 3bca8b8913224255bdf72de79265ca0441cecb1c /tests | |
parent | bb06ccd52924800ac501d17c8a42038c8e5c5770 (diff) | |
download | redis-py-67214cc3eaa7890c87e45550b8320779f954094b.tar.gz |
Failover handling improvements for RedisCluster and Async RedisCluster (#2377)
* Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements
* Fixed linters
* Type fixes
* Added to CHANGES
* Added getter and setter for the client's retry object and added more tests
* Fixed linters
* Fixed test
* Fixed test_client_kill test
* Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries
* Fixing linters
* Reverting deletion of connection_error_retry_attempts to maintain backward compatibility
* Updating retry object for existing and new connections
* Changed the default value of reinitialize_steps from 10 to 5
* fix review comments
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Co-authored-by: dvora-h <dvora.heller@redis.com>
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_asyncio/test_cluster.py | 78 | ||||
-rw-r--r-- | tests/test_asyncio/test_retry.py | 22 | ||||
-rw-r--r-- | tests/test_cluster.py | 100 | ||||
-rw-r--r-- | tests/test_retry.py | 16 |
4 files changed, 213 insertions, 3 deletions
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 27f1190..38bcaf6 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -13,6 +13,8 @@ from _pytest.fixtures import FixtureRequest from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster from redis.asyncio.connection import Connection, SSLConnection from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( @@ -247,6 +249,76 @@ class TestRedisClusterObj: ] ) + async def test_cluster_set_get_retry_object(self, request: FixtureRequest): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + async with RedisCluster.from_url(url, retry=retry) as r: + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == retry._retries + assert isinstance(n_retry._backoff, NoBackoff) + rand_cluster_node = r.get_random_node() + existing_conn = rand_cluster_node.acquire_connection() + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == new_retry._retries + assert isinstance(n_retry._backoff, ExponentialBackoff) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_cluster_node.acquire_connection() + assert new_conn.retry._retries == new_retry._retries + + async def test_cluster_retry_object(self, request: FixtureRequest) -> None: + url = request.config.getoption("--redis-url") + async with RedisCluster.from_url(url) as rc_default: + # Test default retry + retry = rc_default.connection_kwargs.get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 3 + assert isinstance(retry._backoff, type(default_backoff())) + assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") + + retry = Retry(ExponentialBackoff(10, 5), 5) + async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry: + # Test custom retry + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + == retry + ) + + async with RedisCluster.from_url( + url, connection_error_retry_attempts=0 + ) as rc_no_retries: + # Test no connection retries + assert ( + rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + is None + ) + + async with RedisCluster.from_url( + url, retry=Retry(NoBackoff(), 0) + ) as rc_no_retries: + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .connection_kwargs.get("retry") + ._retries + == 0 + ) + async def test_empty_startup_nodes(self) -> None: """ Test that exception is raised when empty providing empty startup_nodes @@ -1289,8 +1361,11 @@ class TestClusterRedisCommands: assert "addr" in info @skip_if_server_version_lt("2.6.9") - async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: + async def test_client_kill( + self, r: RedisCluster, create_redis: Callable[..., RedisCluster] + ) -> None: node = r.get_primaries()[0] + r2 = await create_redis(cls=RedisCluster, flushdb=False) await r.client_setname("redis-py-c1", target_nodes="all") await r2.client_setname("redis-py-c2", target_nodes="all") clients = [ @@ -1311,6 +1386,7 @@ class TestClusterRedisCommands: ] assert len(clients) == 1 assert clients[0].get("name") == "redis-py-c1" + await r2.close() @skip_if_server_version_lt("2.6.0") async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None: diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py index 38e353b..86e6ddf 100644 --- a/tests/test_asyncio/test_retry.py +++ b/tests/test_asyncio/test_retry.py @@ -1,8 +1,9 @@ import pytest +from redis.asyncio import Redis from redis.asyncio.connection import Connection, UnixDomainSocketConnection from redis.asyncio.retry import Retry -from redis.backoff import AbstractBackoff, NoBackoff +from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff from redis.exceptions import ConnectionError, TimeoutError @@ -114,3 +115,22 @@ class TestRetry: assert self.actual_attempts == 5 assert self.actual_failures == 5 + + +class TestRedisClientRetry: + "Test the Redis client behavior with retries" + + async def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + r = await Redis.from_url(url, retry_on_timeout=True, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + exiting_conn = await r.connection_pool.get_connection("_") + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exiting_conn.retry._retries == new_retry_policy._retries + new_conn = await r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 5652673..d18fbbb 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -7,6 +7,7 @@ from unittest.mock import DEFAULT, Mock, call, patch import pytest from redis import Redis +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -31,6 +32,7 @@ from redis.exceptions import ( ResponseError, TimeoutError, ) +from redis.retry import Retry from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message @@ -358,6 +360,60 @@ class TestRedisClusterObj: assert r.execute_command("SET", "foo", "bar") == "MOCK_OK" + def test_handling_cluster_failover_to_a_replica(self, r): + # Set the key we'll test for + key = "key" + r.set("key", "value") + primary = r.get_node_from_key(key, replica=False) + assert str_if_bytes(r.get("key")) == "value" + # Get the current output of cluster slots + cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS") + replica_host = "" + replica_port = 0 + # Replace one of the replicas to be the new primary based on the + # cluster slots output + for slot_range in cluster_slots: + primary_port = slot_range[2][1] + if primary_port == primary.port: + if len(slot_range) <= 3: + # cluster doesn't have a replica, return + return + replica_host = str_if_bytes(slot_range[3][0]) + replica_port = slot_range[3][1] + # replace replica and primary in the cluster slots output + tmp_node = slot_range[2] + slot_range[2] = slot_range[3] + slot_range[3] = tmp_node + break + + def raise_connection_error(): + raise ConnectionError("error") + + def mock_execute_command(*_args, **_kwargs): + if _args[0] == "CLUSTER SLOTS": + return cluster_slots + else: + raise Exception("Failed to mock cluster slots") + + # Mock connection error for the current primary + mock_node_resp_func(primary, raise_connection_error) + primary.redis_connection.set_retry(Retry(NoBackoff(), 1)) + + # Mock the cluster slots response for all other nodes + redis_mock_node = Mock() + redis_mock_node.execute_command.side_effect = mock_execute_command + # Mock response value for all other commands + redis_mock_node.parse_response.return_value = "MOCK_OK" + for node in r.get_nodes(): + if node.port != primary.port: + node.redis_connection = redis_mock_node + + assert r.get(key) == "MOCK_OK" + new_primary = r.get_node_from_key(key, replica=False) + assert new_primary.host == replica_host + assert new_primary.port == replica_port + assert r.get_node(primary.host, primary.port).server_type == REPLICA + def test_moved_redirection(self, request): """ Test that the client handles MOVED response. @@ -691,6 +747,50 @@ class TestRedisClusterObj: cur_node = r.get_node(node_name=node_name) assert conn == r.get_redis_connection(cur_node) + def test_cluster_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(RedisCluster, request, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == retry._retries + assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + rand_node = r.get_random_node() + existing_conn = rand_node.redis_connection.connection_pool.get_connection("_") + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == new_retry._retries + assert isinstance( + node.redis_connection.get_retry()._backoff, ExponentialBackoff + ) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_node.redis_connection.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry._retries + + def test_cluster_retry_object(self, r) -> None: + # Test default retry + retry = r.get_connection_kwargs().get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 0 + assert isinstance(retry._backoff, type(default_backoff())) + node1 = r.get_node("127.0.0.1", 16379).redis_connection + node2 = r.get_node("127.0.0.1", 16380).redis_connection + assert node1.get_retry()._retries == node2.get_retry()._retries + + # Test custom retry + retry = Retry(ExponentialBackoff(10, 5), 5) + rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379) + .redis_connection.get_retry() + ._retries + == retry._retries + ) + @pytest.mark.onlycluster class TestClusterRedisCommands: diff --git a/tests/test_retry.py b/tests/test_retry.py index f844fd0..3cfea5c 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -2,7 +2,7 @@ from unittest.mock import patch import pytest -from redis.backoff import NoBackoff +from redis.backoff import ExponentialBackoff, NoBackoff from redis.client import Redis from redis.connection import Connection, UnixDomainSocketConnection from redis.exceptions import ( @@ -203,3 +203,17 @@ class TestRedisClientRetry: r.get("foo") finally: assert parse_response.call_count == retries + 1 + + def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(Redis, request, retry_on_timeout=True, retry=retry) + exist_conn = r.connection_pool.get_connection("_") + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exist_conn.retry._retries == new_retry_policy._retries + new_conn = r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries |