summaryrefslogtreecommitdiff
path: root/redis/commands/cluster.py
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-05-08 17:34:20 +0530
committerGitHub <noreply@github.com>2022-05-08 15:04:20 +0300
commit061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch)
treee64d64c5917312a65304f4d630775d08adb38bfa /redis/commands/cluster.py
parentc25be04d6468163d31908774ed358d3fd6bc0a39 (diff)
downloadredis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio * Async Cluster Tests: Async/Await * Add Async RedisCluster * cluster: use ERRORS_ALLOW_RETRY from self.__class__ * async_cluster: rework redis_connection, initialize, & close - move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class - use Locks while initializing or closing - in case of error, close connections instead of instantly reinitializing - create ResourceWarning instead of manually deleting client object - use asyncio.gather to run commands/initialize/close in parallel - inline single use functions - fix test_acl_log for py3.6 * async_cluster: add types * async_cluster: add docs * docs: update sphinx & add sphinx_autodoc_typehints * async_cluster: move TargetNodesT to cluster module * async_cluster/commands: inherit commands from sync class if possible * async_cluster: add benchmark script with aredis & aioredis-cluster * async_cluster: remove logging * async_cluster: inline functions * async_cluster: manage Connection instead of Redis Client * async_cluster/commands: optimize parser * async_cluster: use ensure_future & generators for gather * async_conn: optimize * async_cluster: optimize determine_slot * async_cluster: optimize determine_nodes * async_cluster/parser: optimize _get_moveable_keys * async_cluster: inlined check_slots_coverage * async_cluster: update docstrings * async_cluster: add concurrent test & use read_response/_update_moved_slots without lock Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r--redis/commands/cluster.py567
1 files changed, 408 insertions, 159 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index aaddb6a..06b702f 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -1,27 +1,57 @@
-from typing import Iterator, Union
+import asyncio
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ AsyncIterator,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ NoReturn,
+ Optional,
+ Union,
+)
+from redis.compat import Literal
from redis.crc import key_slot
from redis.exceptions import RedisClusterException, RedisError
-from redis.typing import PatternT
+from redis.typing import (
+ AnyKeyT,
+ ClusterCommandsProtocol,
+ EncodableT,
+ KeysT,
+ KeyT,
+ PatternT,
+)
from .core import (
ACLCommands,
+ AsyncACLCommands,
+ AsyncDataAccessCommands,
+ AsyncFunctionCommands,
+ AsyncManagementCommands,
+ AsyncScriptCommands,
DataAccessCommands,
FunctionCommands,
ManagementCommands,
PubSubCommands,
+ ResponseT,
ScriptCommands,
)
from .helpers import list_or_args
from .redismodules import RedisModuleCommands
+if TYPE_CHECKING:
+ from redis.asyncio.cluster import TargetNodesT
+
-class ClusterMultiKeyCommands:
+class ClusterMultiKeyCommands(ClusterCommandsProtocol):
"""
A class containing commands that handle more than one key
"""
- def _partition_keys_by_slot(self, keys):
+ def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
"""
Split keys into a dictionary that maps a slot to
a list of keys.
@@ -34,7 +64,7 @@ class ClusterMultiKeyCommands:
return slots_to_keys
- def mget_nonatomic(self, keys, *args):
+ def mget_nonatomic(self, keys: KeysT, *args) -> List[Optional[Any]]:
"""
Splits the keys into different slots and then calls MGET
for the keys of every slot. This operation will not be atomic
@@ -70,7 +100,7 @@ class ClusterMultiKeyCommands:
vals_in_order = [all_results[key] for key in keys]
return vals_in_order
- def mset_nonatomic(self, mapping):
+ def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
"""
Sets key/values based on a mapping. Mapping is a dictionary of
key/value pairs. Both keys and values should be strings or types that
@@ -99,7 +129,7 @@ class ClusterMultiKeyCommands:
return res
- def _split_command_across_slots(self, command, *keys):
+ def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
"""
Runs the given command once for the keys
of each slot. Returns the sum of the return values.
@@ -114,7 +144,7 @@ class ClusterMultiKeyCommands:
return total
- def exists(self, *keys):
+ def exists(self, *keys: KeyT) -> ResponseT:
"""
Returns the number of ``names`` that exist in the
whole cluster. The keys are first split up into slots
@@ -124,7 +154,7 @@ class ClusterMultiKeyCommands:
"""
return self._split_command_across_slots("EXISTS", *keys)
- def delete(self, *keys):
+ def delete(self, *keys: KeyT) -> ResponseT:
"""
Deletes the given keys in the cluster.
The keys are first split up into slots
@@ -137,7 +167,7 @@ class ClusterMultiKeyCommands:
"""
return self._split_command_across_slots("DEL", *keys)
- def touch(self, *keys):
+ def touch(self, *keys: KeyT) -> ResponseT:
"""
Updates the last access time of given keys across the
cluster.
@@ -152,7 +182,7 @@ class ClusterMultiKeyCommands:
"""
return self._split_command_across_slots("TOUCH", *keys)
- def unlink(self, *keys):
+ def unlink(self, *keys: KeyT) -> ResponseT:
"""
Remove the specified keys in a different thread.
@@ -167,160 +197,135 @@ class ClusterMultiKeyCommands:
return self._split_command_across_slots("UNLINK", *keys)
-class ClusterManagementCommands(ManagementCommands):
+class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
"""
- A class for Redis Cluster management commands
-
- The class inherits from Redis's core ManagementCommands class and do the
- required adjustments to work with cluster mode
+ A class containing commands that handle more than one key
"""
- def slaveof(self, *args, **kwargs):
+ async def mget_nonatomic(self, keys: KeysT, *args) -> List[Optional[Any]]:
"""
- Make the server a replica of another instance, or promote it as master.
+ Splits the keys into different slots and then calls MGET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
- For more information see https://redis.io/commands/slaveof
- """
- raise RedisClusterException("SLAVEOF is not supported in cluster mode")
+ Returns a list of values ordered identically to ``keys``
- def replicaof(self, *args, **kwargs):
+ For more information see https://redis.io/commands/mget
"""
- Make the server a replica of another instance, or promote it as master.
- For more information see https://redis.io/commands/replicaof
- """
- raise RedisClusterException("REPLICAOF is not supported in cluster mode")
+ from redis.client import EMPTY_RESPONSE
- def swapdb(self, *args, **kwargs):
- """
- Swaps two Redis databases.
+ options = {}
+ if not args:
+ options[EMPTY_RESPONSE] = []
- For more information see https://redis.io/commands/swapdb
- """
- raise RedisClusterException("SWAPDB is not supported in cluster mode")
+ # Concatenate all keys into a list
+ keys = list_or_args(keys, args)
+ # Split keys into slots
+ slots_to_keys = self._partition_keys_by_slot(keys)
+ # Call MGET for every slot and concatenate
+ # the results
+ # We must make sure that the keys are returned in order
+ all_values = await asyncio.gather(
+ *(
+ asyncio.ensure_future(
+ self.execute_command("MGET", *slot_keys, **options)
+ )
+ for slot_keys in slots_to_keys.values()
+ )
+ )
-class ClusterDataAccessCommands(DataAccessCommands):
- """
- A class for Redis Cluster Data Access Commands
+ all_results = {}
+ for slot_keys, slot_values in zip(slots_to_keys.values(), all_values):
+ all_results.update(dict(zip(slot_keys, slot_values)))
- The class inherits from Redis's core DataAccessCommand class and do the
- required adjustments to work with cluster mode
- """
+ # Sort the results
+ vals_in_order = [all_results[key] for key in keys]
+ return vals_in_order
- def stralgo(
- self,
- algo,
- value1,
- value2,
- specific_argument="strings",
- len=False,
- idx=False,
- minmatchlen=None,
- withmatchlen=False,
- **kwargs,
- ):
+ async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
"""
- Implements complex algorithms that operate on strings.
- Right now the only algorithm implemented is the LCS algorithm
- (longest common substring). However new algorithms could be
- implemented in the future.
+ Sets key/values based on a mapping. Mapping is a dictionary of
+ key/value pairs. Both keys and values should be strings or types that
+ can be cast to a string via str().
- ``algo`` Right now must be LCS
- ``value1`` and ``value2`` Can be two strings or two keys
- ``specific_argument`` Specifying if the arguments to the algorithm
- will be keys or strings. strings is the default.
- ``len`` Returns just the len of the match.
- ``idx`` Returns the match positions in each string.
- ``minmatchlen`` Restrict the list of matches to the ones of a given
- minimal length. Can be provided only when ``idx`` set to True.
- ``withmatchlen`` Returns the matches with the len of the match.
- Can be provided only when ``idx`` set to True.
+ Splits the keys into different slots and then calls MSET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
- For more information see https://redis.io/commands/stralgo
+ For more information see https://redis.io/commands/mset
"""
- target_nodes = kwargs.pop("target_nodes", None)
- if specific_argument == "strings" and target_nodes is None:
- target_nodes = "default-node"
- kwargs.update({"target_nodes": target_nodes})
- return super().stralgo(
- algo,
- value1,
- value2,
- specific_argument,
- len,
- idx,
- minmatchlen,
- withmatchlen,
- **kwargs,
- )
- def scan_iter(
- self,
- match: Union[PatternT, None] = None,
- count: Union[int, None] = None,
- _type: Union[str, None] = None,
- **kwargs,
- ) -> Iterator:
- # Do the first query with cursor=0 for all nodes
- cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
- yield from data
+ # Partition the keys by slot
+ slots_to_pairs = {}
+ for pair in mapping.items():
+ # encode the key
+ k = self.encoder.encode(pair[0])
+ slot = key_slot(k)
+ slots_to_pairs.setdefault(slot, []).extend(pair)
- cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
- if cursors:
- # Get nodes by name
- nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
+ # Call MSET for every slot and concatenate
+ # the results (one result per slot)
+ return await asyncio.gather(
+ *(
+ asyncio.ensure_future(self.execute_command("MSET", *pairs))
+ for pairs in slots_to_pairs.values()
+ )
+ )
- # Iterate over each node till its cursor is 0
- kwargs.pop("target_nodes", None)
- while cursors:
- for name, cursor in cursors.items():
- cur, data = self.scan(
- cursor=cursor,
- match=match,
- count=count,
- _type=_type,
- target_nodes=nodes[name],
- **kwargs,
- )
- yield from data
- cursors[name] = cur[name]
+ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
+ """
+ Runs the given command once for the keys
+ of each slot. Returns the sum of the return values.
+ """
+ # Partition the keys by slot
+ slots_to_keys = self._partition_keys_by_slot(keys)
- cursors = {
- name: cursor for name, cursor in cursors.items() if cursor != 0
- }
+ # Sum up the reply from each command
+ return sum(
+ await asyncio.gather(
+ *(
+ asyncio.ensure_future(self.execute_command(command, *slot_keys))
+ for slot_keys in slots_to_keys.values()
+ )
+ )
+ )
-class RedisClusterCommands(
- ClusterMultiKeyCommands,
- ClusterManagementCommands,
- ACLCommands,
- PubSubCommands,
- ClusterDataAccessCommands,
- ScriptCommands,
- FunctionCommands,
- RedisModuleCommands,
-):
+class ClusterManagementCommands(ManagementCommands):
"""
- A class for all Redis Cluster commands
+ A class for Redis Cluster management commands
- For key-based commands, the target node(s) will be internally determined
- by the keys' hash slot.
- Non-key-based commands can be executed with the 'target_nodes' argument to
- target specific nodes. By default, if target_nodes is not specified, the
- command will be executed on the default cluster node.
+ The class inherits from Redis's core ManagementCommands class and do the
+ required adjustments to work with cluster mode
+ """
- :param :target_nodes: type can be one of the followings:
- - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- - 'ClusterNode'
- - 'list(ClusterNodes)'
- - 'dict(any:clusterNodes)'
+ def slaveof(self, *args, **kwargs) -> NoReturn:
+ """
+ Make the server a replica of another instance, or promote it as master.
- for example:
- r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
- """
+ For more information see https://redis.io/commands/slaveof
+ """
+ raise RedisClusterException("SLAVEOF is not supported in cluster mode")
- def cluster_myid(self, target_node):
+ def replicaof(self, *args, **kwargs) -> NoReturn:
+ """
+ Make the server a replica of another instance, or promote it as master.
+
+ For more information see https://redis.io/commands/replicaof
+ """
+ raise RedisClusterException("REPLICAOF is not supported in cluster mode")
+
+ def swapdb(self, *args, **kwargs) -> NoReturn:
+ """
+ Swaps two Redis databases.
+
+ For more information see https://redis.io/commands/swapdb
+ """
+ raise RedisClusterException("SWAPDB is not supported in cluster mode")
+
+ def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
"""
Returns the node's id.
@@ -331,7 +336,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER MYID", target_nodes=target_node)
- def cluster_addslots(self, target_node, *slots):
+ def cluster_addslots(
+ self, target_node: "TargetNodesT", *slots: EncodableT
+ ) -> ResponseT:
"""
Assign new hash slots to receiving node. Sends to specified node.
@@ -344,7 +351,9 @@ class RedisClusterCommands(
"CLUSTER ADDSLOTS", *slots, target_nodes=target_node
)
- def cluster_addslotsrange(self, target_node, *slots):
+ def cluster_addslotsrange(
+ self, target_node: "TargetNodesT", *slots: EncodableT
+ ) -> ResponseT:
"""
Similar to the CLUSTER ADDSLOTS command.
The difference between the two commands is that ADDSLOTS takes a list of slots
@@ -360,7 +369,7 @@ class RedisClusterCommands(
"CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
)
- def cluster_countkeysinslot(self, slot_id):
+ def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
"""
Return the number of local keys in the specified hash slot
Send to node based on specified slot_id
@@ -369,7 +378,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
- def cluster_count_failure_report(self, node_id):
+ def cluster_count_failure_report(self, node_id: str) -> ResponseT:
"""
Return the number of failure reports active for a given node
Sends to a random node
@@ -378,7 +387,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
- def cluster_delslots(self, *slots):
+ def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
"""
Set hash slots as unbound in the cluster.
It determines by it self what node the slot is in and sends it there
@@ -389,7 +398,7 @@ class RedisClusterCommands(
"""
return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
- def cluster_delslotsrange(self, *slots):
+ def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
"""
Similar to the CLUSTER DELSLOTS command.
The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
@@ -400,7 +409,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
- def cluster_failover(self, target_node, option=None):
+ def cluster_failover(
+ self, target_node: "TargetNodesT", option: Optional[str] = None
+ ) -> ResponseT:
"""
Forces a slave to perform a manual failover of its master
Sends to specified node
@@ -422,7 +433,7 @@ class RedisClusterCommands(
else:
return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
- def cluster_info(self, target_nodes=None):
+ def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
"""
Provides info about Redis Cluster node state.
The command will be sent to a random node in the cluster if no target
@@ -432,7 +443,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
- def cluster_keyslot(self, key):
+ def cluster_keyslot(self, key: str) -> ResponseT:
"""
Returns the hash slot of the specified key
Sends to random node in the cluster
@@ -441,7 +452,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER KEYSLOT", key)
- def cluster_meet(self, host, port, target_nodes=None):
+ def cluster_meet(
+ self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
+ ) -> ResponseT:
"""
Force a node cluster to handshake with another node.
Sends to specified node.
@@ -452,7 +465,7 @@ class RedisClusterCommands(
"CLUSTER MEET", host, port, target_nodes=target_nodes
)
- def cluster_nodes(self):
+ def cluster_nodes(self) -> ResponseT:
"""
Get Cluster config for the node.
Sends to random node in the cluster
@@ -461,7 +474,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER NODES")
- def cluster_replicate(self, target_nodes, node_id):
+ def cluster_replicate(
+ self, target_nodes: "TargetNodesT", node_id: str
+ ) -> ResponseT:
"""
Reconfigure a node as a slave of the specified master node
@@ -471,7 +486,9 @@ class RedisClusterCommands(
"CLUSTER REPLICATE", node_id, target_nodes=target_nodes
)
- def cluster_reset(self, soft=True, target_nodes=None):
+ def cluster_reset(
+ self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
+ ) -> ResponseT:
"""
Reset a Redis Cluster node
@@ -484,7 +501,9 @@ class RedisClusterCommands(
"CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
)
- def cluster_save_config(self, target_nodes=None):
+ def cluster_save_config(
+ self, target_nodes: Optional["TargetNodesT"] = None
+ ) -> ResponseT:
"""
Forces the node to save cluster state on disk
@@ -492,7 +511,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
- def cluster_get_keys_in_slot(self, slot, num_keys):
+ def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
"""
Returns the number of keys in the specified cluster slot
@@ -500,7 +519,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
- def cluster_set_config_epoch(self, epoch, target_nodes=None):
+ def cluster_set_config_epoch(
+ self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
+ ) -> ResponseT:
"""
Set the configuration epoch in a new node
@@ -510,7 +531,9 @@ class RedisClusterCommands(
"CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
)
- def cluster_setslot(self, target_node, node_id, slot_id, state):
+ def cluster_setslot(
+ self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
+ ) -> ResponseT:
"""
Bind an hash slot to a specific node
@@ -528,7 +551,7 @@ class RedisClusterCommands(
else:
raise RedisError(f"Invalid slot state: {state}")
- def cluster_setslot_stable(self, slot_id):
+ def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
"""
Clears migrating / importing state from the slot.
It determines by it self what node the slot is in and sends it there.
@@ -537,7 +560,9 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
- def cluster_replicas(self, node_id, target_nodes=None):
+ def cluster_replicas(
+ self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
+ ) -> ResponseT:
"""
Provides a list of replica nodes replicating from the specified primary
target node.
@@ -548,7 +573,7 @@ class RedisClusterCommands(
"CLUSTER REPLICAS", node_id, target_nodes=target_nodes
)
- def cluster_slots(self, target_nodes=None):
+ def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
"""
Get array of Cluster slot to node mappings
@@ -556,7 +581,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
- def cluster_links(self, target_node):
+ def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
"""
Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
peer in the cluster: One for sending outbound messages towards the peer and one
@@ -568,7 +593,7 @@ class RedisClusterCommands(
"""
return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
- def readonly(self, target_nodes=None):
+ def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
"""
Enables read queries.
The command will be sent to the default cluster node if target_nodes is
@@ -582,7 +607,7 @@ class RedisClusterCommands(
self.read_from_replicas = True
return self.execute_command("READONLY", target_nodes=target_nodes)
- def readwrite(self, target_nodes=None):
+ def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
"""
Disables read queries.
The command will be sent to the default cluster node if target_nodes is
@@ -593,3 +618,227 @@ class RedisClusterCommands(
# Reset read from replicas flag
self.read_from_replicas = False
return self.execute_command("READWRITE", target_nodes=target_nodes)
+
+
+class AsyncClusterManagementCommands(
+ ClusterManagementCommands, AsyncManagementCommands
+):
+ """
+ A class for Redis Cluster management commands
+
+ The class inherits from Redis's core ManagementCommands class and do the
+ required adjustments to work with cluster mode
+ """
+
+ async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
+ """
+ Set hash slots as unbound in the cluster.
+ It determines by it self what node the slot is in and sends it there
+
+ Returns a list of the results for each processed slot.
+
+ For more information see https://redis.io/commands/cluster-delslots
+ """
+ return await asyncio.gather(
+ *(
+ asyncio.ensure_future(self.execute_command("CLUSTER DELSLOTS", slot))
+ for slot in slots
+ )
+ )
+
+
+class ClusterDataAccessCommands(DataAccessCommands):
+ """
+ A class for Redis Cluster Data Access Commands
+
+ The class inherits from Redis's core DataAccessCommand class and do the
+ required adjustments to work with cluster mode
+ """
+
+ def stralgo(
+ self,
+ algo: Literal["LCS"],
+ value1: KeyT,
+ value2: KeyT,
+ specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
+ len: bool = False,
+ idx: bool = False,
+ minmatchlen: Optional[int] = None,
+ withmatchlen: bool = False,
+ **kwargs,
+ ) -> ResponseT:
+ """
+ Implements complex algorithms that operate on strings.
+ Right now the only algorithm implemented is the LCS algorithm
+ (longest common substring). However new algorithms could be
+ implemented in the future.
+
+ ``algo`` Right now must be LCS
+ ``value1`` and ``value2`` Can be two strings or two keys
+ ``specific_argument`` Specifying if the arguments to the algorithm
+ will be keys or strings. strings is the default.
+ ``len`` Returns just the len of the match.
+ ``idx`` Returns the match positions in each string.
+ ``minmatchlen`` Restrict the list of matches to the ones of a given
+ minimal length. Can be provided only when ``idx`` set to True.
+ ``withmatchlen`` Returns the matches with the len of the match.
+ Can be provided only when ``idx`` set to True.
+
+ For more information see https://redis.io/commands/stralgo
+ """
+ target_nodes = kwargs.pop("target_nodes", None)
+ if specific_argument == "strings" and target_nodes is None:
+ target_nodes = "default-node"
+ kwargs.update({"target_nodes": target_nodes})
+ return super().stralgo(
+ algo,
+ value1,
+ value2,
+ specific_argument,
+ len,
+ idx,
+ minmatchlen,
+ withmatchlen,
+ **kwargs,
+ )
+
+ def scan_iter(
+ self,
+ match: Optional[PatternT] = None,
+ count: Optional[int] = None,
+ _type: Optional[str] = None,
+ **kwargs,
+ ) -> Iterator:
+ # Do the first query with cursor=0 for all nodes
+ cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
+ yield from data
+
+ cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
+ if cursors:
+ # Get nodes by name
+ nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
+
+ # Iterate over each node till its cursor is 0
+ kwargs.pop("target_nodes", None)
+ while cursors:
+ for name, cursor in cursors.items():
+ cur, data = self.scan(
+ cursor=cursor,
+ match=match,
+ count=count,
+ _type=_type,
+ target_nodes=nodes[name],
+ **kwargs,
+ )
+ yield from data
+ cursors[name] = cur[name]
+
+ cursors = {
+ name: cursor for name, cursor in cursors.items() if cursor != 0
+ }
+
+
+class AsyncClusterDataAccessCommands(
+ ClusterDataAccessCommands, AsyncDataAccessCommands
+):
+ """
+ A class for Redis Cluster Data Access Commands
+
+ The class inherits from Redis's core DataAccessCommand class and do the
+ required adjustments to work with cluster mode
+ """
+
+ async def scan_iter(
+ self,
+ match: Optional[PatternT] = None,
+ count: Optional[int] = None,
+ _type: Optional[str] = None,
+ **kwargs,
+ ) -> AsyncIterator:
+ # Do the first query with cursor=0 for all nodes
+ cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
+ for value in data:
+ yield value
+
+ cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
+ if cursors:
+ # Get nodes by name
+ nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
+
+ # Iterate over each node till its cursor is 0
+ kwargs.pop("target_nodes", None)
+ while cursors:
+ for name, cursor in cursors.items():
+ cur, data = await self.scan(
+ cursor=cursor,
+ match=match,
+ count=count,
+ _type=_type,
+ target_nodes=nodes[name],
+ **kwargs,
+ )
+ for value in data:
+ yield value
+ cursors[name] = cur[name]
+
+ cursors = {
+ name: cursor for name, cursor in cursors.items() if cursor != 0
+ }
+
+
+class RedisClusterCommands(
+ ClusterMultiKeyCommands,
+ ClusterManagementCommands,
+ ACLCommands,
+ PubSubCommands,
+ ClusterDataAccessCommands,
+ ScriptCommands,
+ FunctionCommands,
+ RedisModuleCommands,
+):
+ """
+ A class for all Redis Cluster commands
+
+ For key-based commands, the target node(s) will be internally determined
+ by the keys' hash slot.
+ Non-key-based commands can be executed with the 'target_nodes' argument to
+ target specific nodes. By default, if target_nodes is not specified, the
+ command will be executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
+ """
+
+
+class AsyncRedisClusterCommands(
+ AsyncClusterMultiKeyCommands,
+ AsyncClusterManagementCommands,
+ AsyncACLCommands,
+ AsyncClusterDataAccessCommands,
+ AsyncScriptCommands,
+ AsyncFunctionCommands,
+):
+ """
+ A class for all Redis Cluster commands
+
+ For key-based commands, the target node(s) will be internally determined
+ by the keys' hash slot.
+ Non-key-based commands can be executed with the 'target_nodes' argument to
+ target specific nodes. By default, if target_nodes is not specified, the
+ command will be executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
+ """