From 827dcde5c0af5f7aa9bdc3999fc86aa2ba945118 Mon Sep 17 00:00:00 2001 From: Utkarsh Gupta Date: Wed, 23 Mar 2022 14:48:16 +0530 Subject: [CLUSTER] Fix scan command cursors & Fix scan_iter (#2054) * cluster/scan: fix return cursor & change default node to primaries * cluster/scan_iter: fix iteration Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com> --- redis/commands/cluster.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) (limited to 'redis/commands/cluster.py') diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index d68f93b..bcbfbd5 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,5 +1,8 @@ +from typing import Iterator, Union + from redis.crc import key_slot from redis.exceptions import RedisClusterException, RedisError +from redis.typing import PatternT from .core import ( ACLCommands, @@ -206,6 +209,41 @@ class ClusterDataAccessCommands(DataAccessCommands): **kwargs, ) + def scan_iter( + self, + match: Union[PatternT, None] = None, + count: Union[int, None] = None, + _type: Union[str, None] = None, + **kwargs, + ) -> Iterator: + # Do the first query with cursor=0 for all nodes + cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) + yield from data + + cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} + if cursors: + # Get nodes by name + nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} + + # Iterate over each node till its cursor is 0 + kwargs.pop("target_nodes", None) + while cursors: + for name, cursor in cursors.items(): + cur, data = self.scan( + cursor=cursor, + match=match, + count=count, + _type=_type, + target_nodes=nodes[name], + **kwargs, + ) + yield from data + cursors[name] = cur[name] + + cursors = { + name: cursor for name, cursor in cursors.items() if cursor != 0 + } + class RedisClusterCommands( ClusterMultiKeyCommands, -- cgit v1.2.1