diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-06-01 17:15:50 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 14:45:50 +0300 |
commit | 05fc203f68c24fbd54c7b338b4610fa62972c326 (patch) | |
tree | 4550f62dc9c2ce6d2822b9bbaeaf227cb82ad75b | |
parent | 7880460b72aca49aa5b9512f0995c0d17d884a7d (diff) | |
download | redis-py-05fc203f68c24fbd54c7b338b4610fa62972c326.tar.gz |
async_cluster: optimisations (#2205)
- return true from execute_pipeline if there are any errors
- use todo list to speedup retries
- store initialisation node in CommandsParser object
- add sync context manager for pipeline
- use if/else instead of try/except
- make command a function argument in _determine_nodes & _determine_slot
- add async cluster pipeline benchmark script
-rw-r--r-- | benchmarks/cluster_async.py | 4 | ||||
-rw-r--r-- | benchmarks/cluster_async_pipeline.py | 107 | ||||
-rw-r--r-- | redis/asyncio/cluster.py | 165 | ||||
-rw-r--r-- | redis/asyncio/parser.py | 23 |
4 files changed, 206 insertions, 93 deletions
diff --git a/benchmarks/cluster_async.py b/benchmarks/cluster_async.py index fd2ab46..17dd52b 100644 --- a/benchmarks/cluster_async.py +++ b/benchmarks/cluster_async.py @@ -249,8 +249,8 @@ if __name__ == "__main__": port = 16379 password = None - count = 1000 - size = 16 + count = 10000 + size = 256 asyncio.run(main("asyncio")) asyncio.run(main("asyncio", gather=False)) diff --git a/benchmarks/cluster_async_pipeline.py b/benchmarks/cluster_async_pipeline.py new file mode 100644 index 0000000..af45b44 --- /dev/null +++ b/benchmarks/cluster_async_pipeline.py @@ -0,0 +1,107 @@ +import asyncio +import functools +import time + +import aioredis_cluster +import aredis +import uvloop + +import redis.asyncio as redispy + + +def timer(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + tic = time.perf_counter() + await func(*args, **kwargs) + toc = time.perf_counter() + return f"{toc - tic:.4f}" + + return wrapper + + +@timer +async def warmup(client): + await asyncio.gather( + *(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100)) + ) + + +@timer +async def run(client): + data_str = "a" * size + data_int = int("1" * size) + + for i in range(count): + with client.pipeline() as pipe: + await ( + pipe.set(f"bench:str_{i}", data_str) + .set(f"bench:int_{i}", data_int) + .get(f"bench:str_{i}") + .get(f"bench:int_{i}") + .hset("bench:hset", str(i), data_str) + .hget("bench:hset", str(i)) + .incr("bench:incr") + .lpush("bench:lpush", data_int) + .lrange("bench:lpush", 0, 300) + .lpop("bench:lpush") + .execute() + ) + + +async def main(loop): + arc = aredis.StrictRedisCluster( + host=host, + port=port, + password=password, + max_connections=2**31, + max_connections_per_node=2**31, + readonly=False, + reinitialize_steps=count, + skip_full_coverage_check=True, + decode_responses=False, + max_idle_time=count, + idle_check_interval=count, + ) + print(f"{loop} {await warmup(arc)} aredis") + print(await run(arc)) + arc.connection_pool.disconnect() + + aiorc = await aioredis_cluster.create_redis_cluster( + [(host, port)], + password=password, + state_reload_interval=count, + idle_connection_timeout=count, + pool_maxsize=2**31, + ) + print(f"{loop} {await warmup(aiorc)} aioredis-cluster") + print(await run(aiorc)) + aiorc.close() + await aiorc.wait_closed() + + async with redispy.RedisCluster( + host=host, + port=port, + password=password, + reinitialize_steps=count, + read_from_replicas=False, + decode_responses=False, + max_connections=2**31, + ) as rca: + print(f"{loop} {await warmup(rca)} redispy") + print(await run(rca)) + + +if __name__ == "__main__": + host = "localhost" + port = 16379 + password = None + + count = 10000 + size = 256 + + asyncio.run(main("asyncio")) + + uvloop.install() + + asyncio.run(main("uvloop")) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 3405a49..b53f848 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -468,9 +468,8 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand return key_slot(k) async def _determine_nodes( - self, *args: Any, node_flag: Optional[str] = None + self, command: str, *args: Any, node_flag: Optional[str] = None ) -> List["ClusterNode"]: - command = args[0] if not node_flag: # get the nodes group for this command if it was predefined node_flag = self.command_flags.get(command) @@ -495,16 +494,15 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand # get the node that holds the key's slot return [ self.nodes_manager.get_node_from_slot( - await self._determine_slot(*args), + await self._determine_slot(command, *args), self.read_from_replicas and command in READ_COMMANDS, ) ] - async def _determine_slot(self, *args: Any) -> int: - command = args[0] + async def _determine_slot(self, command: str, *args: Any) -> int: if self.command_flags.get(command) == SLOT_ID: # The command contains the slot ID - return int(args[1]) + return int(args[0]) # Get the keys in the command @@ -516,19 +514,17 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand # - fix: https://github.com/redis/redis/pull/9733 if command in ("EVAL", "EVALSHA"): # command syntax: EVAL "script body" num_keys ... - if len(args) <= 2: - raise RedisClusterException(f"Invalid args in command: {args}") - num_actual_keys = args[2] - eval_keys = args[3 : 3 + num_actual_keys] + if len(args) < 2: + raise RedisClusterException( + f"Invalid args in command: {command, *args}" + ) + keys = args[2 : 2 + args[1]] # if there are 0 keys, that means the script can be run on any node # so we can just return a random slot - if not eval_keys: + if not keys: return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS) - keys = eval_keys else: - keys = await self.commands_parser.get_keys( - self.nodes_manager.default_node, *args - ) + keys = await self.commands_parser.get_keys(command, *args) if not keys: # FCALL can call a function with 0 keys, that means the function # can be run on any node so we can just return a random slot @@ -848,13 +844,13 @@ class ClusterNode: self._free.append(connection) return self._free.popleft() - else: - if len(self._connections) < self.max_connections: - connection = self.connection_class(**self.connection_kwargs) - self._connections.append(connection) - return connection - else: - raise ConnectionError("Too many connections") + + if len(self._connections) < self.max_connections: + connection = self.connection_class(**self.connection_kwargs) + self._connections.append(connection) + return connection + + raise ConnectionError("Too many connections") async def parse_response( self, connection: Connection, command: str, **kwargs: Any @@ -872,10 +868,10 @@ class ClusterNode: raise # Return response - try: + if command in self.response_callbacks: return self.response_callbacks[command](response, **kwargs) - except KeyError: - return response + + return response async def execute_command(self, *args: Any, **kwargs: Any) -> Any: # Acquire connection @@ -891,7 +887,7 @@ class ClusterNode: # Release connection self._free.append(connection) - async def execute_pipeline(self) -> None: + async def execute_pipeline(self) -> bool: # Acquire connection connection = self.acquire_connection() @@ -901,17 +897,20 @@ class ClusterNode: ) # Read responses - try: - for cmd in self._command_stack: - try: - cmd.result = await self.parse_response( - connection, cmd.args[0], **cmd.kwargs - ) - except Exception as e: - cmd.result = e - finally: - # Release connection - self._free.append(connection) + ret = False + for cmd in self._command_stack: + try: + cmd.result = await self.parse_response( + connection, cmd.args[0], **cmd.kwargs + ) + except Exception as e: + cmd.result = e + ret = True + + # Release connection + self._free.append(connection) + + return ret class NodesManager: @@ -1257,6 +1256,13 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm def __await__(self) -> Generator[Any, None, "ClusterPipeline"]: return self.initialize().__await__() + def __enter__(self) -> "ClusterPipeline": + self._command_stack = [] + return self + + def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None: + self._command_stack = [] + def __bool__(self) -> bool: return bool(self._command_stack) @@ -1310,6 +1316,7 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm try: return await self._execute( + self._client, self._command_stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, @@ -1331,60 +1338,60 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm async def _execute( self, + client: "RedisCluster", stack: List["PipelineCommand"], raise_on_error: bool = True, allow_redirections: bool = True, ) -> List[Any]: - client = self._client + todo = [ + cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception) + ] + nodes = {} - for cmd in stack: - if not cmd.result or isinstance(cmd.result, Exception): - target_nodes = await client._determine_nodes(*cmd.args) - if not target_nodes: - raise RedisClusterException( - f"No targets were found to execute {cmd.args} command on" - ) - if len(target_nodes) > 1: - raise RedisClusterException( - f"Too many targets for command {cmd.args}" - ) + for cmd in todo: + target_nodes = await client._determine_nodes(*cmd.args) + if not target_nodes: + raise RedisClusterException( + f"No targets were found to execute {cmd.args} command on" + ) + if len(target_nodes) > 1: + raise RedisClusterException(f"Too many targets for command {cmd.args}") - node = target_nodes[0] - if node.name not in nodes: - nodes[node.name] = node - node._command_stack = [] - node._command_stack.append(cmd) + node = target_nodes[0] + if node.name not in nodes: + nodes[node.name] = node + node._command_stack = [] + node._command_stack.append(cmd) - await asyncio.gather( + errors = await asyncio.gather( *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values()) ) - if allow_redirections: - # send each errored command individually - for cmd in stack: - if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): - try: - cmd.result = await client.execute_command( - *cmd.args, **cmd.kwargs + if any(errors): + if allow_redirections: + # send each errored command individually + for cmd in todo: + if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): + try: + cmd.result = await client.execute_command( + *cmd.args, **cmd.kwargs + ) + except Exception as e: + cmd.result = e + + if raise_on_error: + for cmd in todo: + result = cmd.result + if isinstance(result, Exception): + command = " ".join(map(safe_str, cmd.args)) + msg = ( + f"Command # {cmd.position + 1} ({command}) of pipeline " + f"caused error: {result.args}" ) - except Exception as e: - cmd.result = e - - responses = [cmd.result for cmd in stack] - - if raise_on_error: - for cmd in stack: - result = cmd.result - if isinstance(result, Exception): - command = " ".join(map(safe_str, cmd.args)) - msg = ( - f"Command # {cmd.position + 1} ({command}) of pipeline " - f"caused error: {result.args}" - ) - result.args = (msg,) + result.args[1:] - raise result + result.args = (msg,) + result.args[1:] + raise result - return responses + return [cmd.result for cmd in stack] def _split_command_across_slots( self, command: str, *keys: KeyT diff --git a/redis/asyncio/parser.py b/redis/asyncio/parser.py index 6286351..9518c3f 100644 --- a/redis/asyncio/parser.py +++ b/redis/asyncio/parser.py @@ -22,13 +22,16 @@ class CommandsParser: So, don't use this with EVAL or EVALSHA. """ - __slots__ = ("commands",) + __slots__ = ("commands", "node") def __init__(self) -> None: self.commands: Dict[str, Union[int, Dict[str, Any]]] = {} - async def initialize(self, r: "ClusterNode") -> None: - commands = await r.execute_command("COMMAND") + async def initialize(self, node: Optional["ClusterNode"] = None) -> None: + if node: + self.node = node + + commands = await self.node.execute_command("COMMAND") for cmd, command in commands.items(): if "movablekeys" in command["flags"]: commands[cmd] = -1 @@ -41,9 +44,7 @@ class CommandsParser: # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions # https://github.com/redis/redis/pull/8324 - async def get_keys( - self, redis_conn: "ClusterNode", *args: Any - ) -> Optional[Tuple[str, ...]]: + async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: if len(args) < 2: # The command has no keys in it return None @@ -58,7 +59,7 @@ class CommandsParser: if cmd_name not in self.commands: # We'll try to reinitialize the commands cache, if the engine # version has changed, the commands may not be current - await self.initialize(redis_conn) + await self.initialize() if cmd_name not in self.commands: raise RedisError( f"{cmd_name.upper()} command doesn't exist in Redis commands" @@ -71,18 +72,16 @@ class CommandsParser: if command == 0: return None if command == -1: - return await self._get_moveable_keys(redis_conn, *args) + return await self._get_moveable_keys(*args) last_key_pos = command["last_key_pos"] if last_key_pos < 0: last_key_pos = len(args) + last_key_pos return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]] - async def _get_moveable_keys( - self, redis_conn: "ClusterNode", *args: Any - ) -> Optional[Tuple[str, ...]]: + async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: try: - keys = await redis_conn.execute_command("COMMAND GETKEYS", *args) + keys = await self.node.execute_command("COMMAND GETKEYS", *args) except ResponseError as e: message = e.__str__() if ( |