summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-06-01 17:15:50 +0530
committerGitHub <noreply@github.com>2022-06-01 14:45:50 +0300
commit05fc203f68c24fbd54c7b338b4610fa62972c326 (patch)
tree4550f62dc9c2ce6d2822b9bbaeaf227cb82ad75b
parent7880460b72aca49aa5b9512f0995c0d17d884a7d (diff)
downloadredis-py-05fc203f68c24fbd54c7b338b4610fa62972c326.tar.gz
async_cluster: optimisations (#2205)
- return true from execute_pipeline if there are any errors - use todo list to speedup retries - store initialisation node in CommandsParser object - add sync context manager for pipeline - use if/else instead of try/except - make command a function argument in _determine_nodes & _determine_slot - add async cluster pipeline benchmark script
-rw-r--r--benchmarks/cluster_async.py4
-rw-r--r--benchmarks/cluster_async_pipeline.py107
-rw-r--r--redis/asyncio/cluster.py165
-rw-r--r--redis/asyncio/parser.py23
4 files changed, 206 insertions, 93 deletions
diff --git a/benchmarks/cluster_async.py b/benchmarks/cluster_async.py
index fd2ab46..17dd52b 100644
--- a/benchmarks/cluster_async.py
+++ b/benchmarks/cluster_async.py
@@ -249,8 +249,8 @@ if __name__ == "__main__":
port = 16379
password = None
- count = 1000
- size = 16
+ count = 10000
+ size = 256
asyncio.run(main("asyncio"))
asyncio.run(main("asyncio", gather=False))
diff --git a/benchmarks/cluster_async_pipeline.py b/benchmarks/cluster_async_pipeline.py
new file mode 100644
index 0000000..af45b44
--- /dev/null
+++ b/benchmarks/cluster_async_pipeline.py
@@ -0,0 +1,107 @@
+import asyncio
+import functools
+import time
+
+import aioredis_cluster
+import aredis
+import uvloop
+
+import redis.asyncio as redispy
+
+
+def timer(func):
+ @functools.wraps(func)
+ async def wrapper(*args, **kwargs):
+ tic = time.perf_counter()
+ await func(*args, **kwargs)
+ toc = time.perf_counter()
+ return f"{toc - tic:.4f}"
+
+ return wrapper
+
+
+@timer
+async def warmup(client):
+ await asyncio.gather(
+ *(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
+ )
+
+
+@timer
+async def run(client):
+ data_str = "a" * size
+ data_int = int("1" * size)
+
+ for i in range(count):
+ with client.pipeline() as pipe:
+ await (
+ pipe.set(f"bench:str_{i}", data_str)
+ .set(f"bench:int_{i}", data_int)
+ .get(f"bench:str_{i}")
+ .get(f"bench:int_{i}")
+ .hset("bench:hset", str(i), data_str)
+ .hget("bench:hset", str(i))
+ .incr("bench:incr")
+ .lpush("bench:lpush", data_int)
+ .lrange("bench:lpush", 0, 300)
+ .lpop("bench:lpush")
+ .execute()
+ )
+
+
+async def main(loop):
+ arc = aredis.StrictRedisCluster(
+ host=host,
+ port=port,
+ password=password,
+ max_connections=2**31,
+ max_connections_per_node=2**31,
+ readonly=False,
+ reinitialize_steps=count,
+ skip_full_coverage_check=True,
+ decode_responses=False,
+ max_idle_time=count,
+ idle_check_interval=count,
+ )
+ print(f"{loop} {await warmup(arc)} aredis")
+ print(await run(arc))
+ arc.connection_pool.disconnect()
+
+ aiorc = await aioredis_cluster.create_redis_cluster(
+ [(host, port)],
+ password=password,
+ state_reload_interval=count,
+ idle_connection_timeout=count,
+ pool_maxsize=2**31,
+ )
+ print(f"{loop} {await warmup(aiorc)} aioredis-cluster")
+ print(await run(aiorc))
+ aiorc.close()
+ await aiorc.wait_closed()
+
+ async with redispy.RedisCluster(
+ host=host,
+ port=port,
+ password=password,
+ reinitialize_steps=count,
+ read_from_replicas=False,
+ decode_responses=False,
+ max_connections=2**31,
+ ) as rca:
+ print(f"{loop} {await warmup(rca)} redispy")
+ print(await run(rca))
+
+
+if __name__ == "__main__":
+ host = "localhost"
+ port = 16379
+ password = None
+
+ count = 10000
+ size = 256
+
+ asyncio.run(main("asyncio"))
+
+ uvloop.install()
+
+ asyncio.run(main("uvloop"))
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 3405a49..b53f848 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -468,9 +468,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
return key_slot(k)
async def _determine_nodes(
- self, *args: Any, node_flag: Optional[str] = None
+ self, command: str, *args: Any, node_flag: Optional[str] = None
) -> List["ClusterNode"]:
- command = args[0]
if not node_flag:
# get the nodes group for this command if it was predefined
node_flag = self.command_flags.get(command)
@@ -495,16 +494,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
# get the node that holds the key's slot
return [
self.nodes_manager.get_node_from_slot(
- await self._determine_slot(*args),
+ await self._determine_slot(command, *args),
self.read_from_replicas and command in READ_COMMANDS,
)
]
- async def _determine_slot(self, *args: Any) -> int:
- command = args[0]
+ async def _determine_slot(self, command: str, *args: Any) -> int:
if self.command_flags.get(command) == SLOT_ID:
# The command contains the slot ID
- return int(args[1])
+ return int(args[0])
# Get the keys in the command
@@ -516,19 +514,17 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
# - fix: https://github.com/redis/redis/pull/9733
if command in ("EVAL", "EVALSHA"):
# command syntax: EVAL "script body" num_keys ...
- if len(args) <= 2:
- raise RedisClusterException(f"Invalid args in command: {args}")
- num_actual_keys = args[2]
- eval_keys = args[3 : 3 + num_actual_keys]
+ if len(args) < 2:
+ raise RedisClusterException(
+ f"Invalid args in command: {command, *args}"
+ )
+ keys = args[2 : 2 + args[1]]
# if there are 0 keys, that means the script can be run on any node
# so we can just return a random slot
- if not eval_keys:
+ if not keys:
return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
- keys = eval_keys
else:
- keys = await self.commands_parser.get_keys(
- self.nodes_manager.default_node, *args
- )
+ keys = await self.commands_parser.get_keys(command, *args)
if not keys:
# FCALL can call a function with 0 keys, that means the function
# can be run on any node so we can just return a random slot
@@ -848,13 +844,13 @@ class ClusterNode:
self._free.append(connection)
return self._free.popleft()
- else:
- if len(self._connections) < self.max_connections:
- connection = self.connection_class(**self.connection_kwargs)
- self._connections.append(connection)
- return connection
- else:
- raise ConnectionError("Too many connections")
+
+ if len(self._connections) < self.max_connections:
+ connection = self.connection_class(**self.connection_kwargs)
+ self._connections.append(connection)
+ return connection
+
+ raise ConnectionError("Too many connections")
async def parse_response(
self, connection: Connection, command: str, **kwargs: Any
@@ -872,10 +868,10 @@ class ClusterNode:
raise
# Return response
- try:
+ if command in self.response_callbacks:
return self.response_callbacks[command](response, **kwargs)
- except KeyError:
- return response
+
+ return response
async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Acquire connection
@@ -891,7 +887,7 @@ class ClusterNode:
# Release connection
self._free.append(connection)
- async def execute_pipeline(self) -> None:
+ async def execute_pipeline(self) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -901,17 +897,20 @@ class ClusterNode:
)
# Read responses
- try:
- for cmd in self._command_stack:
- try:
- cmd.result = await self.parse_response(
- connection, cmd.args[0], **cmd.kwargs
- )
- except Exception as e:
- cmd.result = e
- finally:
- # Release connection
- self._free.append(connection)
+ ret = False
+ for cmd in self._command_stack:
+ try:
+ cmd.result = await self.parse_response(
+ connection, cmd.args[0], **cmd.kwargs
+ )
+ except Exception as e:
+ cmd.result = e
+ ret = True
+
+ # Release connection
+ self._free.append(connection)
+
+ return ret
class NodesManager:
@@ -1257,6 +1256,13 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
return self.initialize().__await__()
+ def __enter__(self) -> "ClusterPipeline":
+ self._command_stack = []
+ return self
+
+ def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
+ self._command_stack = []
+
def __bool__(self) -> bool:
return bool(self._command_stack)
@@ -1310,6 +1316,7 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
try:
return await self._execute(
+ self._client,
self._command_stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
@@ -1331,60 +1338,60 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm
async def _execute(
self,
+ client: "RedisCluster",
stack: List["PipelineCommand"],
raise_on_error: bool = True,
allow_redirections: bool = True,
) -> List[Any]:
- client = self._client
+ todo = [
+ cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
+ ]
+
nodes = {}
- for cmd in stack:
- if not cmd.result or isinstance(cmd.result, Exception):
- target_nodes = await client._determine_nodes(*cmd.args)
- if not target_nodes:
- raise RedisClusterException(
- f"No targets were found to execute {cmd.args} command on"
- )
- if len(target_nodes) > 1:
- raise RedisClusterException(
- f"Too many targets for command {cmd.args}"
- )
+ for cmd in todo:
+ target_nodes = await client._determine_nodes(*cmd.args)
+ if not target_nodes:
+ raise RedisClusterException(
+ f"No targets were found to execute {cmd.args} command on"
+ )
+ if len(target_nodes) > 1:
+ raise RedisClusterException(f"Too many targets for command {cmd.args}")
- node = target_nodes[0]
- if node.name not in nodes:
- nodes[node.name] = node
- node._command_stack = []
- node._command_stack.append(cmd)
+ node = target_nodes[0]
+ if node.name not in nodes:
+ nodes[node.name] = node
+ node._command_stack = []
+ node._command_stack.append(cmd)
- await asyncio.gather(
+ errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
)
- if allow_redirections:
- # send each errored command individually
- for cmd in stack:
- if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
- try:
- cmd.result = await client.execute_command(
- *cmd.args, **cmd.kwargs
+ if any(errors):
+ if allow_redirections:
+ # send each errored command individually
+ for cmd in todo:
+ if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
+ try:
+ cmd.result = await client.execute_command(
+ *cmd.args, **cmd.kwargs
+ )
+ except Exception as e:
+ cmd.result = e
+
+ if raise_on_error:
+ for cmd in todo:
+ result = cmd.result
+ if isinstance(result, Exception):
+ command = " ".join(map(safe_str, cmd.args))
+ msg = (
+ f"Command # {cmd.position + 1} ({command}) of pipeline "
+ f"caused error: {result.args}"
)
- except Exception as e:
- cmd.result = e
-
- responses = [cmd.result for cmd in stack]
-
- if raise_on_error:
- for cmd in stack:
- result = cmd.result
- if isinstance(result, Exception):
- command = " ".join(map(safe_str, cmd.args))
- msg = (
- f"Command # {cmd.position + 1} ({command}) of pipeline "
- f"caused error: {result.args}"
- )
- result.args = (msg,) + result.args[1:]
- raise result
+ result.args = (msg,) + result.args[1:]
+ raise result
- return responses
+ return [cmd.result for cmd in stack]
def _split_command_across_slots(
self, command: str, *keys: KeyT
diff --git a/redis/asyncio/parser.py b/redis/asyncio/parser.py
index 6286351..9518c3f 100644
--- a/redis/asyncio/parser.py
+++ b/redis/asyncio/parser.py
@@ -22,13 +22,16 @@ class CommandsParser:
So, don't use this with EVAL or EVALSHA.
"""
- __slots__ = ("commands",)
+ __slots__ = ("commands", "node")
def __init__(self) -> None:
self.commands: Dict[str, Union[int, Dict[str, Any]]] = {}
- async def initialize(self, r: "ClusterNode") -> None:
- commands = await r.execute_command("COMMAND")
+ async def initialize(self, node: Optional["ClusterNode"] = None) -> None:
+ if node:
+ self.node = node
+
+ commands = await self.node.execute_command("COMMAND")
for cmd, command in commands.items():
if "movablekeys" in command["flags"]:
commands[cmd] = -1
@@ -41,9 +44,7 @@ class CommandsParser:
# As soon as this PR is merged into Redis, we should reimplement
# our logic to use COMMAND INFO changes to determine the key positions
# https://github.com/redis/redis/pull/8324
- async def get_keys(
- self, redis_conn: "ClusterNode", *args: Any
- ) -> Optional[Tuple[str, ...]]:
+ async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
if len(args) < 2:
# The command has no keys in it
return None
@@ -58,7 +59,7 @@ class CommandsParser:
if cmd_name not in self.commands:
# We'll try to reinitialize the commands cache, if the engine
# version has changed, the commands may not be current
- await self.initialize(redis_conn)
+ await self.initialize()
if cmd_name not in self.commands:
raise RedisError(
f"{cmd_name.upper()} command doesn't exist in Redis commands"
@@ -71,18 +72,16 @@ class CommandsParser:
if command == 0:
return None
if command == -1:
- return await self._get_moveable_keys(redis_conn, *args)
+ return await self._get_moveable_keys(*args)
last_key_pos = command["last_key_pos"]
if last_key_pos < 0:
last_key_pos = len(args) + last_key_pos
return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]]
- async def _get_moveable_keys(
- self, redis_conn: "ClusterNode", *args: Any
- ) -> Optional[Tuple[str, ...]]:
+ async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]:
try:
- keys = await redis_conn.execute_command("COMMAND GETKEYS", *args)
+ keys = await self.node.execute_command("COMMAND GETKEYS", *args)
except ResponseError as e:
message = e.__str__()
if (