diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-05-08 17:34:20 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-08 15:04:20 +0300 |
commit | 061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch) | |
tree | e64d64c5917312a65304f4d630775d08adb38bfa /redis/commands/cluster.py | |
parent | c25be04d6468163d31908774ed358d3fd6bc0a39 (diff) | |
download | redis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz |
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio
* Async Cluster Tests: Async/Await
* Add Async RedisCluster
* cluster: use ERRORS_ALLOW_RETRY from self.__class__
* async_cluster: rework redis_connection, initialize, & close
- move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class
- use Locks while initializing or closing
- in case of error, close connections instead of instantly reinitializing
- create ResourceWarning instead of manually deleting client object
- use asyncio.gather to run commands/initialize/close in parallel
- inline single use functions
- fix test_acl_log for py3.6
* async_cluster: add types
* async_cluster: add docs
* docs: update sphinx & add sphinx_autodoc_typehints
* async_cluster: move TargetNodesT to cluster module
* async_cluster/commands: inherit commands from sync class if possible
* async_cluster: add benchmark script with aredis & aioredis-cluster
* async_cluster: remove logging
* async_cluster: inline functions
* async_cluster: manage Connection instead of Redis Client
* async_cluster/commands: optimize parser
* async_cluster: use ensure_future & generators for gather
* async_conn: optimize
* async_cluster: optimize determine_slot
* async_cluster: optimize determine_nodes
* async_cluster/parser: optimize _get_moveable_keys
* async_cluster: inlined check_slots_coverage
* async_cluster: update docstrings
* async_cluster: add concurrent test & use read_response/_update_moved_slots without lock
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r-- | redis/commands/cluster.py | 567 |
1 files changed, 408 insertions, 159 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index aaddb6a..06b702f 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,27 +1,57 @@ -from typing import Iterator, Union +import asyncio +from typing import ( + TYPE_CHECKING, + Any, + AsyncIterator, + Dict, + Iterable, + Iterator, + List, + Mapping, + NoReturn, + Optional, + Union, +) +from redis.compat import Literal from redis.crc import key_slot from redis.exceptions import RedisClusterException, RedisError -from redis.typing import PatternT +from redis.typing import ( + AnyKeyT, + ClusterCommandsProtocol, + EncodableT, + KeysT, + KeyT, + PatternT, +) from .core import ( ACLCommands, + AsyncACLCommands, + AsyncDataAccessCommands, + AsyncFunctionCommands, + AsyncManagementCommands, + AsyncScriptCommands, DataAccessCommands, FunctionCommands, ManagementCommands, PubSubCommands, + ResponseT, ScriptCommands, ) from .helpers import list_or_args from .redismodules import RedisModuleCommands +if TYPE_CHECKING: + from redis.asyncio.cluster import TargetNodesT + -class ClusterMultiKeyCommands: +class ClusterMultiKeyCommands(ClusterCommandsProtocol): """ A class containing commands that handle more than one key """ - def _partition_keys_by_slot(self, keys): + def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]: """ Split keys into a dictionary that maps a slot to a list of keys. @@ -34,7 +64,7 @@ class ClusterMultiKeyCommands: return slots_to_keys - def mget_nonatomic(self, keys, *args): + def mget_nonatomic(self, keys: KeysT, *args) -> List[Optional[Any]]: """ Splits the keys into different slots and then calls MGET for the keys of every slot. This operation will not be atomic @@ -70,7 +100,7 @@ class ClusterMultiKeyCommands: vals_in_order = [all_results[key] for key in keys] return vals_in_order - def mset_nonatomic(self, mapping): + def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: """ Sets key/values based on a mapping. Mapping is a dictionary of key/value pairs. Both keys and values should be strings or types that @@ -99,7 +129,7 @@ class ClusterMultiKeyCommands: return res - def _split_command_across_slots(self, command, *keys): + def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: """ Runs the given command once for the keys of each slot. Returns the sum of the return values. @@ -114,7 +144,7 @@ class ClusterMultiKeyCommands: return total - def exists(self, *keys): + def exists(self, *keys: KeyT) -> ResponseT: """ Returns the number of ``names`` that exist in the whole cluster. The keys are first split up into slots @@ -124,7 +154,7 @@ class ClusterMultiKeyCommands: """ return self._split_command_across_slots("EXISTS", *keys) - def delete(self, *keys): + def delete(self, *keys: KeyT) -> ResponseT: """ Deletes the given keys in the cluster. The keys are first split up into slots @@ -137,7 +167,7 @@ class ClusterMultiKeyCommands: """ return self._split_command_across_slots("DEL", *keys) - def touch(self, *keys): + def touch(self, *keys: KeyT) -> ResponseT: """ Updates the last access time of given keys across the cluster. @@ -152,7 +182,7 @@ class ClusterMultiKeyCommands: """ return self._split_command_across_slots("TOUCH", *keys) - def unlink(self, *keys): + def unlink(self, *keys: KeyT) -> ResponseT: """ Remove the specified keys in a different thread. @@ -167,160 +197,135 @@ class ClusterMultiKeyCommands: return self._split_command_across_slots("UNLINK", *keys) -class ClusterManagementCommands(ManagementCommands): +class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands): """ - A class for Redis Cluster management commands - - The class inherits from Redis's core ManagementCommands class and do the - required adjustments to work with cluster mode + A class containing commands that handle more than one key """ - def slaveof(self, *args, **kwargs): + async def mget_nonatomic(self, keys: KeysT, *args) -> List[Optional[Any]]: """ - Make the server a replica of another instance, or promote it as master. + Splits the keys into different slots and then calls MGET + for the keys of every slot. This operation will not be atomic + if keys belong to more than one slot. - For more information see https://redis.io/commands/slaveof - """ - raise RedisClusterException("SLAVEOF is not supported in cluster mode") + Returns a list of values ordered identically to ``keys`` - def replicaof(self, *args, **kwargs): + For more information see https://redis.io/commands/mget """ - Make the server a replica of another instance, or promote it as master. - For more information see https://redis.io/commands/replicaof - """ - raise RedisClusterException("REPLICAOF is not supported in cluster mode") + from redis.client import EMPTY_RESPONSE - def swapdb(self, *args, **kwargs): - """ - Swaps two Redis databases. + options = {} + if not args: + options[EMPTY_RESPONSE] = [] - For more information see https://redis.io/commands/swapdb - """ - raise RedisClusterException("SWAPDB is not supported in cluster mode") + # Concatenate all keys into a list + keys = list_or_args(keys, args) + # Split keys into slots + slots_to_keys = self._partition_keys_by_slot(keys) + # Call MGET for every slot and concatenate + # the results + # We must make sure that the keys are returned in order + all_values = await asyncio.gather( + *( + asyncio.ensure_future( + self.execute_command("MGET", *slot_keys, **options) + ) + for slot_keys in slots_to_keys.values() + ) + ) -class ClusterDataAccessCommands(DataAccessCommands): - """ - A class for Redis Cluster Data Access Commands + all_results = {} + for slot_keys, slot_values in zip(slots_to_keys.values(), all_values): + all_results.update(dict(zip(slot_keys, slot_values))) - The class inherits from Redis's core DataAccessCommand class and do the - required adjustments to work with cluster mode - """ + # Sort the results + vals_in_order = [all_results[key] for key in keys] + return vals_in_order - def stralgo( - self, - algo, - value1, - value2, - specific_argument="strings", - len=False, - idx=False, - minmatchlen=None, - withmatchlen=False, - **kwargs, - ): + async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]: """ - Implements complex algorithms that operate on strings. - Right now the only algorithm implemented is the LCS algorithm - (longest common substring). However new algorithms could be - implemented in the future. + Sets key/values based on a mapping. Mapping is a dictionary of + key/value pairs. Both keys and values should be strings or types that + can be cast to a string via str(). - ``algo`` Right now must be LCS - ``value1`` and ``value2`` Can be two strings or two keys - ``specific_argument`` Specifying if the arguments to the algorithm - will be keys or strings. strings is the default. - ``len`` Returns just the len of the match. - ``idx`` Returns the match positions in each string. - ``minmatchlen`` Restrict the list of matches to the ones of a given - minimal length. Can be provided only when ``idx`` set to True. - ``withmatchlen`` Returns the matches with the len of the match. - Can be provided only when ``idx`` set to True. + Splits the keys into different slots and then calls MSET + for the keys of every slot. This operation will not be atomic + if keys belong to more than one slot. - For more information see https://redis.io/commands/stralgo + For more information see https://redis.io/commands/mset """ - target_nodes = kwargs.pop("target_nodes", None) - if specific_argument == "strings" and target_nodes is None: - target_nodes = "default-node" - kwargs.update({"target_nodes": target_nodes}) - return super().stralgo( - algo, - value1, - value2, - specific_argument, - len, - idx, - minmatchlen, - withmatchlen, - **kwargs, - ) - def scan_iter( - self, - match: Union[PatternT, None] = None, - count: Union[int, None] = None, - _type: Union[str, None] = None, - **kwargs, - ) -> Iterator: - # Do the first query with cursor=0 for all nodes - cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) - yield from data + # Partition the keys by slot + slots_to_pairs = {} + for pair in mapping.items(): + # encode the key + k = self.encoder.encode(pair[0]) + slot = key_slot(k) + slots_to_pairs.setdefault(slot, []).extend(pair) - cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} - if cursors: - # Get nodes by name - nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} + # Call MSET for every slot and concatenate + # the results (one result per slot) + return await asyncio.gather( + *( + asyncio.ensure_future(self.execute_command("MSET", *pairs)) + for pairs in slots_to_pairs.values() + ) + ) - # Iterate over each node till its cursor is 0 - kwargs.pop("target_nodes", None) - while cursors: - for name, cursor in cursors.items(): - cur, data = self.scan( - cursor=cursor, - match=match, - count=count, - _type=_type, - target_nodes=nodes[name], - **kwargs, - ) - yield from data - cursors[name] = cur[name] + async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int: + """ + Runs the given command once for the keys + of each slot. Returns the sum of the return values. + """ + # Partition the keys by slot + slots_to_keys = self._partition_keys_by_slot(keys) - cursors = { - name: cursor for name, cursor in cursors.items() if cursor != 0 - } + # Sum up the reply from each command + return sum( + await asyncio.gather( + *( + asyncio.ensure_future(self.execute_command(command, *slot_keys)) + for slot_keys in slots_to_keys.values() + ) + ) + ) -class RedisClusterCommands( - ClusterMultiKeyCommands, - ClusterManagementCommands, - ACLCommands, - PubSubCommands, - ClusterDataAccessCommands, - ScriptCommands, - FunctionCommands, - RedisModuleCommands, -): +class ClusterManagementCommands(ManagementCommands): """ - A class for all Redis Cluster commands + A class for Redis Cluster management commands - For key-based commands, the target node(s) will be internally determined - by the keys' hash slot. - Non-key-based commands can be executed with the 'target_nodes' argument to - target specific nodes. By default, if target_nodes is not specified, the - command will be executed on the default cluster node. + The class inherits from Redis's core ManagementCommands class and do the + required adjustments to work with cluster mode + """ - :param :target_nodes: type can be one of the followings: - - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM - - 'ClusterNode' - - 'list(ClusterNodes)' - - 'dict(any:clusterNodes)' + def slaveof(self, *args, **kwargs) -> NoReturn: + """ + Make the server a replica of another instance, or promote it as master. - for example: - r.cluster_info(target_nodes=RedisCluster.ALL_NODES) - """ + For more information see https://redis.io/commands/slaveof + """ + raise RedisClusterException("SLAVEOF is not supported in cluster mode") - def cluster_myid(self, target_node): + def replicaof(self, *args, **kwargs) -> NoReturn: + """ + Make the server a replica of another instance, or promote it as master. + + For more information see https://redis.io/commands/replicaof + """ + raise RedisClusterException("REPLICAOF is not supported in cluster mode") + + def swapdb(self, *args, **kwargs) -> NoReturn: + """ + Swaps two Redis databases. + + For more information see https://redis.io/commands/swapdb + """ + raise RedisClusterException("SWAPDB is not supported in cluster mode") + + def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT: """ Returns the node's id. @@ -331,7 +336,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER MYID", target_nodes=target_node) - def cluster_addslots(self, target_node, *slots): + def cluster_addslots( + self, target_node: "TargetNodesT", *slots: EncodableT + ) -> ResponseT: """ Assign new hash slots to receiving node. Sends to specified node. @@ -344,7 +351,9 @@ class RedisClusterCommands( "CLUSTER ADDSLOTS", *slots, target_nodes=target_node ) - def cluster_addslotsrange(self, target_node, *slots): + def cluster_addslotsrange( + self, target_node: "TargetNodesT", *slots: EncodableT + ) -> ResponseT: """ Similar to the CLUSTER ADDSLOTS command. The difference between the two commands is that ADDSLOTS takes a list of slots @@ -360,7 +369,7 @@ class RedisClusterCommands( "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node ) - def cluster_countkeysinslot(self, slot_id): + def cluster_countkeysinslot(self, slot_id: int) -> ResponseT: """ Return the number of local keys in the specified hash slot Send to node based on specified slot_id @@ -369,7 +378,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) - def cluster_count_failure_report(self, node_id): + def cluster_count_failure_report(self, node_id: str) -> ResponseT: """ Return the number of failure reports active for a given node Sends to a random node @@ -378,7 +387,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) - def cluster_delslots(self, *slots): + def cluster_delslots(self, *slots: EncodableT) -> List[bool]: """ Set hash slots as unbound in the cluster. It determines by it self what node the slot is in and sends it there @@ -389,7 +398,7 @@ class RedisClusterCommands( """ return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] - def cluster_delslotsrange(self, *slots): + def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT: """ Similar to the CLUSTER DELSLOTS command. The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove @@ -400,7 +409,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER DELSLOTSRANGE", *slots) - def cluster_failover(self, target_node, option=None): + def cluster_failover( + self, target_node: "TargetNodesT", option: Optional[str] = None + ) -> ResponseT: """ Forces a slave to perform a manual failover of its master Sends to specified node @@ -422,7 +433,7 @@ class RedisClusterCommands( else: return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) - def cluster_info(self, target_nodes=None): + def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: """ Provides info about Redis Cluster node state. The command will be sent to a random node in the cluster if no target @@ -432,7 +443,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) - def cluster_keyslot(self, key): + def cluster_keyslot(self, key: str) -> ResponseT: """ Returns the hash slot of the specified key Sends to random node in the cluster @@ -441,7 +452,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER KEYSLOT", key) - def cluster_meet(self, host, port, target_nodes=None): + def cluster_meet( + self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None + ) -> ResponseT: """ Force a node cluster to handshake with another node. Sends to specified node. @@ -452,7 +465,7 @@ class RedisClusterCommands( "CLUSTER MEET", host, port, target_nodes=target_nodes ) - def cluster_nodes(self): + def cluster_nodes(self) -> ResponseT: """ Get Cluster config for the node. Sends to random node in the cluster @@ -461,7 +474,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER NODES") - def cluster_replicate(self, target_nodes, node_id): + def cluster_replicate( + self, target_nodes: "TargetNodesT", node_id: str + ) -> ResponseT: """ Reconfigure a node as a slave of the specified master node @@ -471,7 +486,9 @@ class RedisClusterCommands( "CLUSTER REPLICATE", node_id, target_nodes=target_nodes ) - def cluster_reset(self, soft=True, target_nodes=None): + def cluster_reset( + self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None + ) -> ResponseT: """ Reset a Redis Cluster node @@ -484,7 +501,9 @@ class RedisClusterCommands( "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes ) - def cluster_save_config(self, target_nodes=None): + def cluster_save_config( + self, target_nodes: Optional["TargetNodesT"] = None + ) -> ResponseT: """ Forces the node to save cluster state on disk @@ -492,7 +511,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) - def cluster_get_keys_in_slot(self, slot, num_keys): + def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT: """ Returns the number of keys in the specified cluster slot @@ -500,7 +519,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) - def cluster_set_config_epoch(self, epoch, target_nodes=None): + def cluster_set_config_epoch( + self, epoch: int, target_nodes: Optional["TargetNodesT"] = None + ) -> ResponseT: """ Set the configuration epoch in a new node @@ -510,7 +531,9 @@ class RedisClusterCommands( "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes ) - def cluster_setslot(self, target_node, node_id, slot_id, state): + def cluster_setslot( + self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str + ) -> ResponseT: """ Bind an hash slot to a specific node @@ -528,7 +551,7 @@ class RedisClusterCommands( else: raise RedisError(f"Invalid slot state: {state}") - def cluster_setslot_stable(self, slot_id): + def cluster_setslot_stable(self, slot_id: int) -> ResponseT: """ Clears migrating / importing state from the slot. It determines by it self what node the slot is in and sends it there. @@ -537,7 +560,9 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") - def cluster_replicas(self, node_id, target_nodes=None): + def cluster_replicas( + self, node_id: str, target_nodes: Optional["TargetNodesT"] = None + ) -> ResponseT: """ Provides a list of replica nodes replicating from the specified primary target node. @@ -548,7 +573,7 @@ class RedisClusterCommands( "CLUSTER REPLICAS", node_id, target_nodes=target_nodes ) - def cluster_slots(self, target_nodes=None): + def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: """ Get array of Cluster slot to node mappings @@ -556,7 +581,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) - def cluster_links(self, target_node): + def cluster_links(self, target_node: "TargetNodesT") -> ResponseT: """ Each node in a Redis Cluster maintains a pair of long-lived TCP link with each peer in the cluster: One for sending outbound messages towards the peer and one @@ -568,7 +593,7 @@ class RedisClusterCommands( """ return self.execute_command("CLUSTER LINKS", target_nodes=target_node) - def readonly(self, target_nodes=None): + def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: """ Enables read queries. The command will be sent to the default cluster node if target_nodes is @@ -582,7 +607,7 @@ class RedisClusterCommands( self.read_from_replicas = True return self.execute_command("READONLY", target_nodes=target_nodes) - def readwrite(self, target_nodes=None): + def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT: """ Disables read queries. The command will be sent to the default cluster node if target_nodes is @@ -593,3 +618,227 @@ class RedisClusterCommands( # Reset read from replicas flag self.read_from_replicas = False return self.execute_command("READWRITE", target_nodes=target_nodes) + + +class AsyncClusterManagementCommands( + ClusterManagementCommands, AsyncManagementCommands +): + """ + A class for Redis Cluster management commands + + The class inherits from Redis's core ManagementCommands class and do the + required adjustments to work with cluster mode + """ + + async def cluster_delslots(self, *slots: EncodableT) -> List[bool]: + """ + Set hash slots as unbound in the cluster. + It determines by it self what node the slot is in and sends it there + + Returns a list of the results for each processed slot. + + For more information see https://redis.io/commands/cluster-delslots + """ + return await asyncio.gather( + *( + asyncio.ensure_future(self.execute_command("CLUSTER DELSLOTS", slot)) + for slot in slots + ) + ) + + +class ClusterDataAccessCommands(DataAccessCommands): + """ + A class for Redis Cluster Data Access Commands + + The class inherits from Redis's core DataAccessCommand class and do the + required adjustments to work with cluster mode + """ + + def stralgo( + self, + algo: Literal["LCS"], + value1: KeyT, + value2: KeyT, + specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings", + len: bool = False, + idx: bool = False, + minmatchlen: Optional[int] = None, + withmatchlen: bool = False, + **kwargs, + ) -> ResponseT: + """ + Implements complex algorithms that operate on strings. + Right now the only algorithm implemented is the LCS algorithm + (longest common substring). However new algorithms could be + implemented in the future. + + ``algo`` Right now must be LCS + ``value1`` and ``value2`` Can be two strings or two keys + ``specific_argument`` Specifying if the arguments to the algorithm + will be keys or strings. strings is the default. + ``len`` Returns just the len of the match. + ``idx`` Returns the match positions in each string. + ``minmatchlen`` Restrict the list of matches to the ones of a given + minimal length. Can be provided only when ``idx`` set to True. + ``withmatchlen`` Returns the matches with the len of the match. + Can be provided only when ``idx`` set to True. + + For more information see https://redis.io/commands/stralgo + """ + target_nodes = kwargs.pop("target_nodes", None) + if specific_argument == "strings" and target_nodes is None: + target_nodes = "default-node" + kwargs.update({"target_nodes": target_nodes}) + return super().stralgo( + algo, + value1, + value2, + specific_argument, + len, + idx, + minmatchlen, + withmatchlen, + **kwargs, + ) + + def scan_iter( + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, + ) -> Iterator: + # Do the first query with cursor=0 for all nodes + cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs) + yield from data + + cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} + if cursors: + # Get nodes by name + nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} + + # Iterate over each node till its cursor is 0 + kwargs.pop("target_nodes", None) + while cursors: + for name, cursor in cursors.items(): + cur, data = self.scan( + cursor=cursor, + match=match, + count=count, + _type=_type, + target_nodes=nodes[name], + **kwargs, + ) + yield from data + cursors[name] = cur[name] + + cursors = { + name: cursor for name, cursor in cursors.items() if cursor != 0 + } + + +class AsyncClusterDataAccessCommands( + ClusterDataAccessCommands, AsyncDataAccessCommands +): + """ + A class for Redis Cluster Data Access Commands + + The class inherits from Redis's core DataAccessCommand class and do the + required adjustments to work with cluster mode + """ + + async def scan_iter( + self, + match: Optional[PatternT] = None, + count: Optional[int] = None, + _type: Optional[str] = None, + **kwargs, + ) -> AsyncIterator: + # Do the first query with cursor=0 for all nodes + cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs) + for value in data: + yield value + + cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0} + if cursors: + # Get nodes by name + nodes = {name: self.get_node(node_name=name) for name in cursors.keys()} + + # Iterate over each node till its cursor is 0 + kwargs.pop("target_nodes", None) + while cursors: + for name, cursor in cursors.items(): + cur, data = await self.scan( + cursor=cursor, + match=match, + count=count, + _type=_type, + target_nodes=nodes[name], + **kwargs, + ) + for value in data: + yield value + cursors[name] = cur[name] + + cursors = { + name: cursor for name, cursor in cursors.items() if cursor != 0 + } + + +class RedisClusterCommands( + ClusterMultiKeyCommands, + ClusterManagementCommands, + ACLCommands, + PubSubCommands, + ClusterDataAccessCommands, + ScriptCommands, + FunctionCommands, + RedisModuleCommands, +): + """ + A class for all Redis Cluster commands + + For key-based commands, the target node(s) will be internally determined + by the keys' hash slot. + Non-key-based commands can be executed with the 'target_nodes' argument to + target specific nodes. By default, if target_nodes is not specified, the + command will be executed on the default cluster node. + + :param :target_nodes: type can be one of the followings: + - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM + - 'ClusterNode' + - 'list(ClusterNodes)' + - 'dict(any:clusterNodes)' + + for example: + r.cluster_info(target_nodes=RedisCluster.ALL_NODES) + """ + + +class AsyncRedisClusterCommands( + AsyncClusterMultiKeyCommands, + AsyncClusterManagementCommands, + AsyncACLCommands, + AsyncClusterDataAccessCommands, + AsyncScriptCommands, + AsyncFunctionCommands, +): + """ + A class for all Redis Cluster commands + + For key-based commands, the target node(s) will be internally determined + by the keys' hash slot. + Non-key-based commands can be executed with the 'target_nodes' argument to + target specific nodes. By default, if target_nodes is not specified, the + command will be executed on the default cluster node. + + :param :target_nodes: type can be one of the followings: + - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM + - 'ClusterNode' + - 'list(ClusterNodes)' + - 'dict(any:clusterNodes)' + + for example: + r.cluster_info(target_nodes=RedisCluster.ALL_NODES) + """ |