diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-05-30 21:45:45 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-30 19:15:45 +0300 |
commit | bac33d4a92892ca7982b461347151bff5a661f0d (patch) | |
tree | 976d5dafcc2b3a1c4e129e1da439f1b7bdacacbd /tests/test_asyncio/test_cluster.py | |
parent | c54dfa49dda6a7b3389dc230726293af3ffc68a3 (diff) | |
download | redis-py-bac33d4a92892ca7982b461347151bff5a661f0d.tar.gz |
async_cluster: add pipeline support (#2199)
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'tests/test_asyncio/test_cluster.py')
-rw-r--r-- | tests/test_asyncio/test_cluster.py | 254 |
1 files changed, 253 insertions, 1 deletions
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 123adc8..0c676cb 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -19,7 +19,7 @@ from _pytest.fixtures import FixtureRequest, SubRequest from redis.asyncio import Connection, RedisCluster from redis.asyncio.cluster import ClusterNode, NodesManager from redis.asyncio.parser import CommandsParser -from redis.cluster import PRIMARY, REPLICA, get_node_name +from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, @@ -129,6 +129,16 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode: return node +def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode: + connection = mock.AsyncMock() + connection.is_connected = True + connection.read_response_without_lock.side_effect = exc + while node._free: + node._free.pop() + node._free.append(connection) + return node + + def mock_all_nodes_resp(rc: RedisCluster, response: Any) -> RedisCluster: for node in rc.get_nodes(): mock_node_resp(node, response) @@ -2218,3 +2228,245 @@ class TestNodesManager: async with RedisCluster(startup_nodes=[node_1, node_2]) as rc: assert rc.get_node(host=default_host, port=7001) is not None assert rc.get_node(host=default_host, port=7002) is not None + + +@pytest.mark.onlycluster +class TestClusterPipeline: + """Tests for the ClusterPipeline class.""" + + async def test_blocked_arguments(self, r: RedisCluster) -> None: + """Test handling for blocked pipeline arguments.""" + with pytest.raises(RedisClusterException) as ex: + r.pipeline(transaction=True) + + assert str(ex.value) == "transaction is deprecated in cluster mode" + + with pytest.raises(RedisClusterException) as ex: + r.pipeline(shard_hint=True) + + assert str(ex.value) == "shard_hint is deprecated in cluster mode" + + async def test_blocked_methods(self, r: RedisCluster) -> None: + """Test handling for blocked pipeline commands.""" + pipeline = r.pipeline() + for command in PIPELINE_BLOCKED_COMMANDS: + command = command.replace(" ", "_").lower() + if command == "mset_nonatomic": + continue + + with pytest.raises(RedisClusterException) as exc: + getattr(pipeline, command)() + + assert str(exc.value) == ( + f"ERROR: Calling pipelined function {command} is blocked " + "when running redis in cluster mode..." + ) + + async def test_empty_stack(self, r: RedisCluster) -> None: + """If a pipeline is executed with no commands it should return a empty list.""" + p = r.pipeline() + result = await p.execute() + assert result == [] + + async def test_redis_cluster_pipeline(self, r: RedisCluster) -> None: + """Test that we can use a pipeline with the RedisCluster class""" + result = await ( + r.pipeline() + .set("A", 1) + .get("A") + .hset("K", "F", "V") + .hgetall("K") + .mset_nonatomic({"A": 2, "B": 3}) + .get("A") + .get("B") + .delete("A", "B", "K") + .execute() + ) + assert result == [True, b"1", 1, {b"F": b"V"}, True, True, b"2", b"3", 1, 1, 1] + + async def test_multi_key_operation_with_a_single_slot( + self, r: RedisCluster + ) -> None: + """Test multi key operation with a single slot.""" + pipe = r.pipeline() + pipe.set("a{foo}", 1) + pipe.set("b{foo}", 2) + pipe.set("c{foo}", 3) + pipe.get("a{foo}") + pipe.get("b{foo}") + pipe.get("c{foo}") + + res = await pipe.execute() + assert res == [True, True, True, b"1", b"2", b"3"] + + async def test_multi_key_operation_with_multi_slots(self, r: RedisCluster) -> None: + """Test multi key operation with more than one slot.""" + pipe = r.pipeline() + pipe.set("a{foo}", 1) + pipe.set("b{foo}", 2) + pipe.set("c{foo}", 3) + pipe.set("bar", 4) + pipe.set("bazz", 5) + pipe.get("a{foo}") + pipe.get("b{foo}") + pipe.get("c{foo}") + pipe.get("bar") + pipe.get("bazz") + res = await pipe.execute() + assert res == [True, True, True, True, True, b"1", b"2", b"3", b"4", b"5"] + + async def test_cluster_down_error(self, r: RedisCluster) -> None: + """ + Test that the pipeline retries cluster_error_retry_attempts times before raising + an error. + """ + key = "foo" + node = r.get_node_from_key(key, False) + + parse_response_orig = node.parse_response + with mock.patch.object( + ClusterNode, "parse_response", autospec=True + ) as parse_response_mock: + + async def parse_response( + self, connection: Connection, command: str, **kwargs: Any + ) -> Any: + if command == "GET": + raise ClusterDownError("error") + return await parse_response_orig(connection, command, **kwargs) + + parse_response_mock.side_effect = parse_response + + # For each ClusterDownError, we launch 4 commands: INFO, CLUSTER SLOTS, + # COMMAND, GET. Before any errors, the first 3 commands are already run + async with r.pipeline() as pipe: + with pytest.raises(ClusterDownError): + await pipe.get(key).execute() + + assert ( + node.parse_response.await_count + == 4 * r.cluster_error_retry_attempts - 3 + ) + + async def test_connection_error_not_raised(self, r: RedisCluster) -> None: + """Test ConnectionError handling with raise_on_error=False.""" + key = "foo" + node = r.get_node_from_key(key, False) + + parse_response_orig = node.parse_response + with mock.patch.object( + ClusterNode, "parse_response", autospec=True + ) as parse_response_mock: + + async def parse_response( + self, connection: Connection, command: str, **kwargs: Any + ) -> Any: + if command == "GET": + raise ConnectionError("error") + return await parse_response_orig(connection, command, **kwargs) + + parse_response_mock.side_effect = parse_response + + async with r.pipeline() as pipe: + res = await pipe.get(key).get(key).execute(raise_on_error=False) + assert node.parse_response.await_count + assert isinstance(res[0], ConnectionError) + + async def test_connection_error_raised(self, r: RedisCluster) -> None: + """Test ConnectionError handling with raise_on_error=True.""" + key = "foo" + node = r.get_node_from_key(key, False) + + parse_response_orig = node.parse_response + with mock.patch.object( + ClusterNode, "parse_response", autospec=True + ) as parse_response_mock: + + async def parse_response( + self, connection: Connection, command: str, **kwargs: Any + ) -> Any: + if command == "GET": + raise ConnectionError("error") + return await parse_response_orig(connection, command, **kwargs) + + parse_response_mock.side_effect = parse_response + + async with r.pipeline() as pipe: + with pytest.raises(ConnectionError): + await pipe.get(key).get(key).execute(raise_on_error=True) + + async def test_asking_error(self, r: RedisCluster) -> None: + """Test AskError handling.""" + key = "foo" + first_node = r.get_node_from_key(key, False) + ask_node = None + for node in r.get_nodes(): + if node != first_node: + ask_node = node + break + ask_msg = f"{r.keyslot(key)} {ask_node.host}:{ask_node.port}" + + async with r.pipeline() as pipe: + mock_node_resp_exc(first_node, AskError(ask_msg)) + mock_node_resp(ask_node, "MOCK_OK") + res = await pipe.get(key).execute() + assert first_node._free.pop().read_response_without_lock.await_count + assert ask_node._free.pop().read_response_without_lock.await_count + assert res == ["MOCK_OK"] + + async def test_moved_redirection_on_slave_with_default( + self, r: RedisCluster + ) -> None: + """Test MovedError handling.""" + key = "foo" + await r.set("foo", "bar") + # set read_from_replicas to True + r.read_from_replicas = True + primary = r.get_node_from_key(key, False) + moved_error = f"{r.keyslot(key)} {primary.host}:{primary.port}" + + parse_response_orig = primary.parse_response + with mock.patch.object( + ClusterNode, "parse_response", autospec=True + ) as parse_response_mock: + + async def parse_response( + self, connection: Connection, command: str, **kwargs: Any + ) -> Any: + if ( + command == "GET" + and self.host != primary.host + and self.port != primary.port + ): + raise MovedError(moved_error) + + return await parse_response_orig(connection, command, **kwargs) + + parse_response_mock.side_effect = parse_response + + async with r.pipeline() as readwrite_pipe: + assert r.reinitialize_counter == 0 + readwrite_pipe.get(key).get(key) + assert r.reinitialize_counter == 0 + assert await readwrite_pipe.execute() == [b"bar", b"bar"] + + async def test_readonly_pipeline_from_readonly_client( + self, r: RedisCluster + ) -> None: + """Test that the pipeline uses replicas for read_from_replicas clients.""" + # Create a cluster with reading from replications + r.read_from_replicas = True + key = "bar" + await r.set(key, "foo") + + async with r.pipeline() as pipe: + mock_all_nodes_resp(r, "MOCK_OK") + assert await pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"] + slot_nodes = r.nodes_manager.slots_cache[r.keyslot(key)] + executed_on_replica = False + for node in slot_nodes: + if node.server_type == REPLICA: + if node._free.pop().read_response_without_lock.await_count: + executed_on_replica = True + break + assert executed_on_replica |