From ae171d16e173c367256b1da42f66947fd3c6d1ea Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Sun, 24 Jul 2022 18:05:01 +0530 Subject: async_cluster: fix concurrent pipeline (#2280) - each pipeline should create separate stacks for each node --- redis/asyncio/cluster.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'redis/asyncio/cluster.py') diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 2894004..3fe3ebc 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -755,7 +755,6 @@ class ClusterNode: """ __slots__ = ( - "_command_stack", "_connections", "_free", "connection_class", @@ -796,7 +795,6 @@ class ClusterNode: self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) - self._command_stack: List["PipelineCommand"] = [] def __repr__(self) -> str: return ( @@ -887,18 +885,18 @@ class ClusterNode: # Release connection self._free.append(connection) - async def execute_pipeline(self) -> bool: + async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection connection = self.acquire_connection() # Execute command await connection.send_packed_command( - connection.pack_commands(cmd.args for cmd in self._command_stack), False + connection.pack_commands(cmd.args for cmd in commands), False ) # Read responses ret = False - for cmd in self._command_stack: + for cmd in commands: try: cmd.result = await self.parse_response( connection, cmd.args[0], **cmd.kwargs @@ -1365,12 +1363,14 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm node = target_nodes[0] if node.name not in nodes: - nodes[node.name] = node - node._command_stack = [] - node._command_stack.append(cmd) + nodes[node.name] = (node, []) + nodes[node.name][1].append(cmd) errors = await asyncio.gather( - *(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values()) + *( + asyncio.ensure_future(node[0].execute_pipeline(node[1])) + for node in nodes.values() + ) ) if any(errors): -- cgit v1.2.1