path: root/tests/test_asyncio/
diff options
authorUtkarsh Gupta <>2022-05-30 21:45:45 +0530
committerGitHub <>2022-05-30 19:15:45 +0300
commitbac33d4a92892ca7982b461347151bff5a661f0d (patch)
tree976d5dafcc2b3a1c4e129e1da439f1b7bdacacbd /tests/test_asyncio/
parentc54dfa49dda6a7b3389dc230726293af3ffc68a3 (diff)
async_cluster: add pipeline support (#2199)
Co-authored-by: dvora-h <>
Diffstat (limited to 'tests/test_asyncio/')
1 files changed, 253 insertions, 1 deletions
diff --git a/tests/test_asyncio/ b/tests/test_asyncio/
index 123adc8..0c676cb 100644
--- a/tests/test_asyncio/
+++ b/tests/test_asyncio/
@@ -19,7 +19,7 @@ from _pytest.fixtures import FixtureRequest, SubRequest
from redis.asyncio import Connection, RedisCluster
from redis.asyncio.cluster import ClusterNode, NodesManager
from redis.asyncio.parser import CommandsParser
-from redis.cluster import PRIMARY, REPLICA, get_node_name
+from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
@@ -129,6 +129,16 @@ def mock_node_resp(node: ClusterNode, response: Any) -> ClusterNode:
return node
+def mock_node_resp_exc(node: ClusterNode, exc: Exception) -> ClusterNode:
+ connection = mock.AsyncMock()
+ connection.is_connected = True
+ connection.read_response_without_lock.side_effect = exc
+ while node._free:
+ node._free.pop()
+ node._free.append(connection)
+ return node
def mock_all_nodes_resp(rc: RedisCluster, response: Any) -> RedisCluster:
for node in rc.get_nodes():
mock_node_resp(node, response)
@@ -2218,3 +2228,245 @@ class TestNodesManager:
async with RedisCluster(startup_nodes=[node_1, node_2]) as rc:
assert rc.get_node(host=default_host, port=7001) is not None
assert rc.get_node(host=default_host, port=7002) is not None
+class TestClusterPipeline:
+ """Tests for the ClusterPipeline class."""
+ async def test_blocked_arguments(self, r: RedisCluster) -> None:
+ """Test handling for blocked pipeline arguments."""
+ with pytest.raises(RedisClusterException) as ex:
+ r.pipeline(transaction=True)
+ assert str(ex.value) == "transaction is deprecated in cluster mode"
+ with pytest.raises(RedisClusterException) as ex:
+ r.pipeline(shard_hint=True)
+ assert str(ex.value) == "shard_hint is deprecated in cluster mode"
+ async def test_blocked_methods(self, r: RedisCluster) -> None:
+ """Test handling for blocked pipeline commands."""
+ pipeline = r.pipeline()
+ command = command.replace(" ", "_").lower()
+ if command == "mset_nonatomic":
+ continue
+ with pytest.raises(RedisClusterException) as exc:
+ getattr(pipeline, command)()
+ assert str(exc.value) == (
+ f"ERROR: Calling pipelined function {command} is blocked "
+ "when running redis in cluster mode..."
+ )
+ async def test_empty_stack(self, r: RedisCluster) -> None:
+ """If a pipeline is executed with no commands it should return a empty list."""
+ p = r.pipeline()
+ result = await p.execute()
+ assert result == []
+ async def test_redis_cluster_pipeline(self, r: RedisCluster) -> None:
+ """Test that we can use a pipeline with the RedisCluster class"""
+ result = await (
+ r.pipeline()
+ .set("A", 1)
+ .get("A")
+ .hset("K", "F", "V")
+ .hgetall("K")
+ .mset_nonatomic({"A": 2, "B": 3})
+ .get("A")
+ .get("B")
+ .delete("A", "B", "K")
+ .execute()
+ )
+ assert result == [True, b"1", 1, {b"F": b"V"}, True, True, b"2", b"3", 1, 1, 1]
+ async def test_multi_key_operation_with_a_single_slot(
+ self, r: RedisCluster
+ ) -> None:
+ """Test multi key operation with a single slot."""
+ pipe = r.pipeline()
+ pipe.set("a{foo}", 1)
+ pipe.set("b{foo}", 2)
+ pipe.set("c{foo}", 3)
+ pipe.get("a{foo}")
+ pipe.get("b{foo}")
+ pipe.get("c{foo}")
+ res = await pipe.execute()
+ assert res == [True, True, True, b"1", b"2", b"3"]
+ async def test_multi_key_operation_with_multi_slots(self, r: RedisCluster) -> None:
+ """Test multi key operation with more than one slot."""
+ pipe = r.pipeline()
+ pipe.set("a{foo}", 1)
+ pipe.set("b{foo}", 2)
+ pipe.set("c{foo}", 3)
+ pipe.set("bar", 4)
+ pipe.set("bazz", 5)
+ pipe.get("a{foo}")
+ pipe.get("b{foo}")
+ pipe.get("c{foo}")
+ pipe.get("bar")
+ pipe.get("bazz")
+ res = await pipe.execute()
+ assert res == [True, True, True, True, True, b"1", b"2", b"3", b"4", b"5"]
+ async def test_cluster_down_error(self, r: RedisCluster) -> None:
+ """
+ Test that the pipeline retries cluster_error_retry_attempts times before raising
+ an error.
+ """
+ key = "foo"
+ node = r.get_node_from_key(key, False)
+ parse_response_orig = node.parse_response
+ with mock.patch.object(
+ ClusterNode, "parse_response", autospec=True
+ ) as parse_response_mock:
+ async def parse_response(
+ self, connection: Connection, command: str, **kwargs: Any
+ ) -> Any:
+ if command == "GET":
+ raise ClusterDownError("error")
+ return await parse_response_orig(connection, command, **kwargs)
+ parse_response_mock.side_effect = parse_response
+ # For each ClusterDownError, we launch 4 commands: INFO, CLUSTER SLOTS,
+ # COMMAND, GET. Before any errors, the first 3 commands are already run
+ async with r.pipeline() as pipe:
+ with pytest.raises(ClusterDownError):
+ await pipe.get(key).execute()
+ assert (
+ node.parse_response.await_count
+ == 4 * r.cluster_error_retry_attempts - 3
+ )
+ async def test_connection_error_not_raised(self, r: RedisCluster) -> None:
+ """Test ConnectionError handling with raise_on_error=False."""
+ key = "foo"
+ node = r.get_node_from_key(key, False)
+ parse_response_orig = node.parse_response
+ with mock.patch.object(
+ ClusterNode, "parse_response", autospec=True
+ ) as parse_response_mock:
+ async def parse_response(
+ self, connection: Connection, command: str, **kwargs: Any
+ ) -> Any:
+ if command == "GET":
+ raise ConnectionError("error")
+ return await parse_response_orig(connection, command, **kwargs)
+ parse_response_mock.side_effect = parse_response
+ async with r.pipeline() as pipe:
+ res = await pipe.get(key).get(key).execute(raise_on_error=False)
+ assert node.parse_response.await_count
+ assert isinstance(res[0], ConnectionError)
+ async def test_connection_error_raised(self, r: RedisCluster) -> None:
+ """Test ConnectionError handling with raise_on_error=True."""
+ key = "foo"
+ node = r.get_node_from_key(key, False)
+ parse_response_orig = node.parse_response
+ with mock.patch.object(
+ ClusterNode, "parse_response", autospec=True
+ ) as parse_response_mock:
+ async def parse_response(
+ self, connection: Connection, command: str, **kwargs: Any
+ ) -> Any:
+ if command == "GET":
+ raise ConnectionError("error")
+ return await parse_response_orig(connection, command, **kwargs)
+ parse_response_mock.side_effect = parse_response
+ async with r.pipeline() as pipe:
+ with pytest.raises(ConnectionError):
+ await pipe.get(key).get(key).execute(raise_on_error=True)
+ async def test_asking_error(self, r: RedisCluster) -> None:
+ """Test AskError handling."""
+ key = "foo"
+ first_node = r.get_node_from_key(key, False)
+ ask_node = None
+ for node in r.get_nodes():
+ if node != first_node:
+ ask_node = node
+ break
+ ask_msg = f"{r.keyslot(key)} {}:{ask_node.port}"
+ async with r.pipeline() as pipe:
+ mock_node_resp_exc(first_node, AskError(ask_msg))
+ mock_node_resp(ask_node, "MOCK_OK")
+ res = await pipe.get(key).execute()
+ assert first_node._free.pop().read_response_without_lock.await_count
+ assert ask_node._free.pop().read_response_without_lock.await_count
+ assert res == ["MOCK_OK"]
+ async def test_moved_redirection_on_slave_with_default(
+ self, r: RedisCluster
+ ) -> None:
+ """Test MovedError handling."""
+ key = "foo"
+ await r.set("foo", "bar")
+ # set read_from_replicas to True
+ r.read_from_replicas = True
+ primary = r.get_node_from_key(key, False)
+ moved_error = f"{r.keyslot(key)} {}:{primary.port}"
+ parse_response_orig = primary.parse_response
+ with mock.patch.object(
+ ClusterNode, "parse_response", autospec=True
+ ) as parse_response_mock:
+ async def parse_response(
+ self, connection: Connection, command: str, **kwargs: Any
+ ) -> Any:
+ if (
+ command == "GET"
+ and !=
+ and self.port != primary.port
+ ):
+ raise MovedError(moved_error)
+ return await parse_response_orig(connection, command, **kwargs)
+ parse_response_mock.side_effect = parse_response
+ async with r.pipeline() as readwrite_pipe:
+ assert r.reinitialize_counter == 0
+ readwrite_pipe.get(key).get(key)
+ assert r.reinitialize_counter == 0
+ assert await readwrite_pipe.execute() == [b"bar", b"bar"]
+ async def test_readonly_pipeline_from_readonly_client(
+ self, r: RedisCluster
+ ) -> None:
+ """Test that the pipeline uses replicas for read_from_replicas clients."""
+ # Create a cluster with reading from replications
+ r.read_from_replicas = True
+ key = "bar"
+ await r.set(key, "foo")
+ async with r.pipeline() as pipe:
+ mock_all_nodes_resp(r, "MOCK_OK")
+ assert await pipe.get(key).get(key).execute() == ["MOCK_OK", "MOCK_OK"]
+ slot_nodes = r.nodes_manager.slots_cache[r.keyslot(key)]
+ executed_on_replica = False
+ for node in slot_nodes:
+ if node.server_type == REPLICA:
+ if node._free.pop().read_response_without_lock.await_count:
+ executed_on_replica = True
+ break
+ assert executed_on_replica