diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-06-01 17:15:50 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 14:45:50 +0300 |
commit | 05fc203f68c24fbd54c7b338b4610fa62972c326 (patch) | |
tree | 4550f62dc9c2ce6d2822b9bbaeaf227cb82ad75b /redis/asyncio/parser.py | |
parent | 7880460b72aca49aa5b9512f0995c0d17d884a7d (diff) | |
download | redis-py-05fc203f68c24fbd54c7b338b4610fa62972c326.tar.gz |
async_cluster: optimisations (#2205)
- return true from execute_pipeline if there are any errors
- use todo list to speedup retries
- store initialisation node in CommandsParser object
- add sync context manager for pipeline
- use if/else instead of try/except
- make command a function argument in _determine_nodes & _determine_slot
- add async cluster pipeline benchmark script
Diffstat (limited to 'redis/asyncio/parser.py')
-rw-r--r-- | redis/asyncio/parser.py | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/redis/asyncio/parser.py b/redis/asyncio/parser.py index 6286351..9518c3f 100644 --- a/redis/asyncio/parser.py +++ b/redis/asyncio/parser.py @@ -22,13 +22,16 @@ class CommandsParser: So, don't use this with EVAL or EVALSHA. """ - __slots__ = ("commands",) + __slots__ = ("commands", "node") def __init__(self) -> None: self.commands: Dict[str, Union[int, Dict[str, Any]]] = {} - async def initialize(self, r: "ClusterNode") -> None: - commands = await r.execute_command("COMMAND") + async def initialize(self, node: Optional["ClusterNode"] = None) -> None: + if node: + self.node = node + + commands = await self.node.execute_command("COMMAND") for cmd, command in commands.items(): if "movablekeys" in command["flags"]: commands[cmd] = -1 @@ -41,9 +44,7 @@ class CommandsParser: # As soon as this PR is merged into Redis, we should reimplement # our logic to use COMMAND INFO changes to determine the key positions # https://github.com/redis/redis/pull/8324 - async def get_keys( - self, redis_conn: "ClusterNode", *args: Any - ) -> Optional[Tuple[str, ...]]: + async def get_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: if len(args) < 2: # The command has no keys in it return None @@ -58,7 +59,7 @@ class CommandsParser: if cmd_name not in self.commands: # We'll try to reinitialize the commands cache, if the engine # version has changed, the commands may not be current - await self.initialize(redis_conn) + await self.initialize() if cmd_name not in self.commands: raise RedisError( f"{cmd_name.upper()} command doesn't exist in Redis commands" @@ -71,18 +72,16 @@ class CommandsParser: if command == 0: return None if command == -1: - return await self._get_moveable_keys(redis_conn, *args) + return await self._get_moveable_keys(*args) last_key_pos = command["last_key_pos"] if last_key_pos < 0: last_key_pos = len(args) + last_key_pos return args[command["first_key_pos"] : last_key_pos + 1 : command["step_count"]] - async def _get_moveable_keys( - self, redis_conn: "ClusterNode", *args: Any - ) -> Optional[Tuple[str, ...]]: + async def _get_moveable_keys(self, *args: Any) -> Optional[Tuple[str, ...]]: try: - keys = await redis_conn.execute_command("COMMAND GETKEYS", *args) + keys = await self.node.execute_command("COMMAND GETKEYS", *args) except ResponseError as e: message = e.__str__() if ( |