summaryrefslogtreecommitdiff
path: root/redis/commands/cluster.py
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2021-12-01 18:33:44 +0200
committerGitHub <noreply@github.com>2021-12-01 18:33:44 +0200
commitd4252277a9dafed5af34b3f40ed7a57fc952d273 (patch)
treef0a96ff0e12c6446bd9ecbf7ed8248a0d5462d8c /redis/commands/cluster.py
parente16e26ea597e6e0c576d7462d3a2285a8647617d (diff)
downloadredis-py-d4252277a9dafed5af34b3f40ed7a57fc952d273.tar.gz
Migrated targeted nodes to kwargs in Cluster Mode (#1762)
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r--redis/commands/cluster.py598
1 files changed, 45 insertions, 553 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index 0df073a..5d0e804 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -1,7 +1,7 @@
from redis.crc import key_slot
-from redis.exceptions import ConnectionError, DataError, RedisError
+from redis.exceptions import RedisClusterException, RedisError
-from .core import DataAccessCommands
+from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands
from .helpers import list_or_args
@@ -144,451 +144,31 @@ class ClusterMultiKeyCommands:
return self._split_command_across_slots("UNLINK", *keys)
-class ClusterManagementCommands:
+class ClusterManagementCommands(ManagementCommands):
"""
- Redis Cluster management commands
+ A class for Redis Cluster management commands
- Commands with the 'target_nodes' argument can be executed on specified
- nodes. By default, if target_nodes is not specified, the command will be
- executed on the default cluster node.
-
- :param :target_nodes: type can be one of the followings:
- - nodes flag: 'all', 'primaries', 'replicas', 'random'
- - 'ClusterNode'
- - 'list(ClusterNodes)'
- - 'dict(any:clusterNodes)'
-
- for example:
- primary = r.get_primaries()[0]
- r.bgsave(target_nodes=primary)
- r.bgsave(target_nodes='primaries')
+ The class inherits from Redis's core ManagementCommands class and do the
+ required adjustments to work with cluster mode
"""
- def bgsave(self, schedule=True, target_nodes=None):
- """
- Tell the Redis server to save its data to disk. Unlike save(),
- this method is asynchronous and returns immediately.
- """
- pieces = []
- if schedule:
- pieces.append("SCHEDULE")
- return self.execute_command("BGSAVE", *pieces, target_nodes=target_nodes)
-
- def client_getname(self, target_nodes=None):
- """
- Returns the current connection name from all nodes.
- The result will be a dictionary with the IP and
- connection name.
- """
- return self.execute_command("CLIENT GETNAME", target_nodes=target_nodes)
-
- def client_getredir(self, target_nodes=None):
- """Returns the ID (an integer) of the client to whom we are
- redirecting tracking notifications.
-
- see: https://redis.io/commands/client-getredir
- """
- return self.execute_command("CLIENT GETREDIR", target_nodes=target_nodes)
-
- def client_id(self, target_nodes=None):
- """Returns the current connection id"""
- return self.execute_command("CLIENT ID", target_nodes=target_nodes)
-
- def client_info(self, target_nodes=None):
- """
- Returns information and statistics about the current
- client connection.
- """
- return self.execute_command("CLIENT INFO", target_nodes=target_nodes)
-
- def client_kill_filter(
- self,
- _id=None,
- _type=None,
- addr=None,
- skipme=None,
- laddr=None,
- user=None,
- target_nodes=None,
- ):
- """
- Disconnects client(s) using a variety of filter options
- :param id: Kills a client by its unique ID field
- :param type: Kills a client by type where type is one of 'normal',
- 'master', 'slave' or 'pubsub'
- :param addr: Kills a client by its 'address:port'
- :param skipme: If True, then the client calling the command
- will not get killed even if it is identified by one of the filter
- options. If skipme is not provided, the server defaults to skipme=True
- :param laddr: Kills a client by its 'local (bind) address:port'
- :param user: Kills a client for a specific user name
- """
- args = []
- if _type is not None:
- client_types = ("normal", "master", "slave", "pubsub")
- if str(_type).lower() not in client_types:
- raise DataError(f"CLIENT KILL type must be one of {client_types!r}")
- args.extend((b"TYPE", _type))
- if skipme is not None:
- if not isinstance(skipme, bool):
- raise DataError("CLIENT KILL skipme must be a bool")
- if skipme:
- args.extend((b"SKIPME", b"YES"))
- else:
- args.extend((b"SKIPME", b"NO"))
- if _id is not None:
- args.extend((b"ID", _id))
- if addr is not None:
- args.extend((b"ADDR", addr))
- if laddr is not None:
- args.extend((b"LADDR", laddr))
- if user is not None:
- args.extend((b"USER", user))
- if not args:
- raise DataError(
- "CLIENT KILL <filter> <value> ... ... <filter> "
- "<value> must specify at least one filter"
- )
- return self.execute_command("CLIENT KILL", *args, target_nodes=target_nodes)
-
- def client_kill(self, address, target_nodes=None):
- "Disconnects the client at ``address`` (ip:port)"
- return self.execute_command("CLIENT KILL", address, target_nodes=target_nodes)
-
- def client_list(self, _type=None, target_nodes=None):
- """
- Returns a list of currently connected clients to the entire cluster.
- If type of client specified, only that type will be returned.
- :param _type: optional. one of the client types (normal, master,
- replica, pubsub)
- """
- if _type is not None:
- client_types = ("normal", "master", "replica", "pubsub")
- if str(_type).lower() not in client_types:
- raise DataError(f"CLIENT LIST _type must be one of {client_types!r}")
- return self.execute_command(
- "CLIENT LIST", b"TYPE", _type, target_noes=target_nodes
- )
- return self.execute_command("CLIENT LIST", target_nodes=target_nodes)
-
- def client_pause(self, timeout, target_nodes=None):
- """
- Suspend all the Redis clients for the specified amount of time
- :param timeout: milliseconds to pause clients
- """
- if not isinstance(timeout, int):
- raise DataError("CLIENT PAUSE timeout must be an integer")
- return self.execute_command(
- "CLIENT PAUSE", str(timeout), target_nodes=target_nodes
- )
-
- def client_reply(self, reply, target_nodes=None):
- """Enable and disable redis server replies.
- ``reply`` Must be ON OFF or SKIP,
- ON - The default most with server replies to commands
- OFF - Disable server responses to commands
- SKIP - Skip the response of the immediately following command.
-
- Note: When setting OFF or SKIP replies, you will need a client object
- with a timeout specified in seconds, and will need to catch the
- TimeoutError.
- The test_client_reply unit test illustrates this, and
- conftest.py has a client with a timeout.
- See https://redis.io/commands/client-reply
- """
- replies = ["ON", "OFF", "SKIP"]
- if reply not in replies:
- raise DataError(f"CLIENT REPLY must be one of {replies!r}")
- return self.execute_command("CLIENT REPLY", reply, target_nodes=target_nodes)
-
- def client_setname(self, name, target_nodes=None):
- "Sets the current connection name"
- return self.execute_command("CLIENT SETNAME", name, target_nodes=target_nodes)
-
- def client_trackinginfo(self, target_nodes=None):
- """
- Returns the information about the current client connection's
- use of the server assisted client side cache.
- See https://redis.io/commands/client-trackinginfo
- """
- return self.execute_command("CLIENT TRACKINGINFO", target_nodes=target_nodes)
-
- def client_unblock(self, client_id, error=False, target_nodes=None):
- """
- Unblocks a connection by its client id.
- If ``error`` is True, unblocks the client with a special error message.
- If ``error`` is False (default), the client is unblocked using the
- regular timeout mechanism.
- """
- args = ["CLIENT UNBLOCK", int(client_id)]
- if error:
- args.append(b"ERROR")
- return self.execute_command(*args, target_nodes=target_nodes)
-
- def client_unpause(self, target_nodes=None):
- """
- Unpause all redis clients
- """
- return self.execute_command("CLIENT UNPAUSE", target_nodes=target_nodes)
-
- def command(self, target_nodes=None):
- """
- Returns dict reply of details about all Redis commands.
- """
- return self.execute_command("COMMAND", target_nodes=target_nodes)
-
- def command_count(self, target_nodes=None):
- """
- Returns Integer reply of number of total commands in this Redis server.
- """
- return self.execute_command("COMMAND COUNT", target_nodes=target_nodes)
-
- def config_get(self, pattern="*", target_nodes=None):
- """
- Return a dictionary of configuration based on the ``pattern``
- """
- return self.execute_command("CONFIG GET", pattern, target_nodes=target_nodes)
-
- def config_resetstat(self, target_nodes=None):
- """Reset runtime statistics"""
- return self.execute_command("CONFIG RESETSTAT", target_nodes=target_nodes)
-
- def config_rewrite(self, target_nodes=None):
- """
- Rewrite config file with the minimal change to reflect running config.
- """
- return self.execute_command("CONFIG REWRITE", target_nodes=target_nodes)
-
- def config_set(self, name, value, target_nodes=None):
- "Set config item ``name`` with ``value``"
- return self.execute_command(
- "CONFIG SET", name, value, target_nodes=target_nodes
- )
-
- def dbsize(self, target_nodes=None):
- """
- Sums the number of keys in the target nodes' DB.
-
- :target_nodes: 'ClusterNode' or 'list(ClusterNodes)'
- The node/s to execute the command on
- """
- return self.execute_command("DBSIZE", target_nodes=target_nodes)
-
- def debug_object(self, key):
- raise NotImplementedError(
- "DEBUG OBJECT is intentionally not implemented in the client."
- )
-
- def debug_segfault(self):
- raise NotImplementedError(
- "DEBUG SEGFAULT is intentionally not implemented in the client."
- )
-
- def echo(self, value, target_nodes):
- """Echo the string back from the server"""
- return self.execute_command("ECHO", value, target_nodes=target_nodes)
-
- def flushall(self, asynchronous=False, target_nodes=None):
- """
- Delete all keys in the database.
- In cluster mode this method is the same as flushdb
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b"ASYNC")
- return self.execute_command("FLUSHALL", *args, target_nodes=target_nodes)
-
- def flushdb(self, asynchronous=False, target_nodes=None):
- """
- Delete all keys in the database.
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b"ASYNC")
- return self.execute_command("FLUSHDB", *args, target_nodes=target_nodes)
-
- def info(self, section=None, target_nodes=None):
- """
- Returns a dictionary containing information about the Redis server
-
- The ``section`` option can be used to select a specific section
- of information
-
- The section option is not supported by older versions of Redis Server,
- and will generate ResponseError
- """
- if section is None:
- return self.execute_command("INFO", target_nodes=target_nodes)
- else:
- return self.execute_command("INFO", section, target_nodes=target_nodes)
-
- def keys(self, pattern="*", target_nodes=None):
- "Returns a list of keys matching ``pattern``"
- return self.execute_command("KEYS", pattern, target_nodes=target_nodes)
-
- def lastsave(self, target_nodes=None):
- """
- Return a Python datetime object representing the last time the
- Redis database was saved to disk
- """
- return self.execute_command("LASTSAVE", target_nodes=target_nodes)
-
- def memory_doctor(self):
- raise NotImplementedError(
- "MEMORY DOCTOR is intentionally not implemented in the client."
- )
-
- def memory_help(self):
- raise NotImplementedError(
- "MEMORY HELP is intentionally not implemented in the client."
- )
-
- def memory_malloc_stats(self, target_nodes=None):
- """Return an internal statistics report from the memory allocator."""
- return self.execute_command("MEMORY MALLOC-STATS", target_nodes=target_nodes)
-
- def memory_purge(self, target_nodes=None):
- """Attempts to purge dirty pages for reclamation by allocator"""
- return self.execute_command("MEMORY PURGE", target_nodes=target_nodes)
+ def slaveof(self, *args, **kwargs):
+ raise RedisClusterException("SLAVEOF is not supported in cluster mode")
- def memory_stats(self, target_nodes=None):
- """Return a dictionary of memory stats"""
- return self.execute_command("MEMORY STATS", target_nodes=target_nodes)
+ def replicaof(self, *args, **kwargs):
+ raise RedisClusterException("REPLICAOF is not supported in cluster" " mode")
- def memory_usage(self, key, samples=None):
- """
- Return the total memory usage for key, its value and associated
- administrative overheads.
-
- For nested data structures, ``samples`` is the number of elements to
- sample. If left unspecified, the server's default is 5. Use 0 to sample
- all elements.
- """
- args = []
- if isinstance(samples, int):
- args.extend([b"SAMPLES", samples])
- return self.execute_command("MEMORY USAGE", key, *args)
-
- def object(self, infotype, key):
- """Return the encoding, idletime, or refcount about the key"""
- return self.execute_command("OBJECT", infotype, key, infotype=infotype)
-
- def ping(self, target_nodes=None):
- """
- Ping the cluster's servers.
- If no target nodes are specified, sent to all nodes and returns True if
- the ping was successful across all nodes.
- """
- return self.execute_command("PING", target_nodes=target_nodes)
-
- def randomkey(self, target_nodes=None):
- """
- Returns the name of a random key"
- """
- return self.execute_command("RANDOMKEY", target_nodes=target_nodes)
+ def swapdb(self, *args, **kwargs):
+ raise RedisClusterException("SWAPDB is not supported in cluster" " mode")
- def save(self, target_nodes=None):
- """
- Tell the Redis server to save its data to disk,
- blocking until the save is complete
- """
- return self.execute_command("SAVE", target_nodes=target_nodes)
-
- def scan(self, cursor=0, match=None, count=None, _type=None, target_nodes=None):
- """
- Incrementally return lists of key names. Also return a cursor
- indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
-
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- pieces = [cursor]
- if match is not None:
- pieces.extend([b"MATCH", match])
- if count is not None:
- pieces.extend([b"COUNT", count])
- if _type is not None:
- pieces.extend([b"TYPE", _type])
- return self.execute_command("SCAN", *pieces, target_nodes=target_nodes)
-
- def scan_iter(self, match=None, count=None, _type=None, target_nodes=None):
- """
- Make an iterator using the SCAN command so that the client doesn't
- need to remember the cursor position.
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
+class ClusterDataAccessCommands(DataAccessCommands):
+ """
+ A class for Redis Cluster Data Access Commands
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- cursor = "0"
- while cursor != 0:
- cursor, data = self.scan(
- cursor=cursor,
- match=match,
- count=count,
- _type=_type,
- target_nodes=target_nodes,
- )
- yield from data
-
- def shutdown(self, save=False, nosave=False, target_nodes=None):
- """Shutdown the Redis server. If Redis has persistence configured,
- data will be flushed before shutdown. If the "save" option is set,
- a data flush will be attempted even if there is no persistence
- configured. If the "nosave" option is set, no data flush will be
- attempted. The "save" and "nosave" options cannot both be set.
- """
- if save and nosave:
- raise DataError("SHUTDOWN save and nosave cannot both be set")
- args = ["SHUTDOWN"]
- if save:
- args.append("SAVE")
- if nosave:
- args.append("NOSAVE")
- try:
- self.execute_command(*args, target_nodes=target_nodes)
- except ConnectionError:
- # a ConnectionError here is expected
- return
- raise RedisError("SHUTDOWN seems to have failed.")
-
- def slowlog_get(self, num=None, target_nodes=None):
- """
- Get the entries from the slowlog. If ``num`` is specified, get the
- most recent ``num`` items.
- """
- args = ["SLOWLOG GET"]
- if num is not None:
- args.append(num)
-
- return self.execute_command(*args, target_nodes=target_nodes)
-
- def slowlog_len(self, target_nodes=None):
- "Get the number of items in the slowlog"
- return self.execute_command("SLOWLOG LEN", target_nodes=target_nodes)
-
- def slowlog_reset(self, target_nodes=None):
- "Remove all items in the slowlog"
- return self.execute_command("SLOWLOG RESET", target_nodes=target_nodes)
+ The class inherits from Redis's core DataAccessCommand class and do the
+ required adjustments to work with cluster mode
+ """
def stralgo(
self,
@@ -600,138 +180,50 @@ class ClusterManagementCommands:
idx=False,
minmatchlen=None,
withmatchlen=False,
- target_nodes=None,
+ **kwargs,
):
- """
- Implements complex algorithms that operate on strings.
- Right now the only algorithm implemented is the LCS algorithm
- (longest common substring). However new algorithms could be
- implemented in the future.
-
- ``algo`` Right now must be LCS
- ``value1`` and ``value2`` Can be two strings or two keys
- ``specific_argument`` Specifying if the arguments to the algorithm
- will be keys or strings. strings is the default.
- ``len`` Returns just the len of the match.
- ``idx`` Returns the match positions in each string.
- ``minmatchlen`` Restrict the list of matches to the ones of a given
- minimal length. Can be provided only when ``idx`` set to True.
- ``withmatchlen`` Returns the matches with the len of the match.
- Can be provided only when ``idx`` set to True.
- """
- # check validity
- supported_algo = ["LCS"]
- if algo not in supported_algo:
- supported_algos_str = ", ".join(supported_algo)
- raise DataError(f"The supported algorithms are: {supported_algos_str}")
- if specific_argument not in ["keys", "strings"]:
- raise DataError("specific_argument can be only keys or strings")
- if len and idx:
- raise DataError("len and idx cannot be provided together.")
-
- pieces = [algo, specific_argument.upper(), value1, value2]
- if len:
- pieces.append(b"LEN")
- if idx:
- pieces.append(b"IDX")
- try:
- int(minmatchlen)
- pieces.extend([b"MINMATCHLEN", minmatchlen])
- except TypeError:
- pass
- if withmatchlen:
- pieces.append(b"WITHMATCHLEN")
+ target_nodes = kwargs.pop("target_nodes", None)
if specific_argument == "strings" and target_nodes is None:
target_nodes = "default-node"
- return self.execute_command(
- "STRALGO",
- *pieces,
- len=len,
- idx=idx,
- minmatchlen=minmatchlen,
- withmatchlen=withmatchlen,
- target_nodes=target_nodes,
- )
-
- def time(self, target_nodes=None):
- """
- Returns the server time as a 2-item tuple of ints:
- (seconds since epoch, microseconds into this second).
- """
- return self.execute_command("TIME", target_nodes=target_nodes)
-
- def wait(self, num_replicas, timeout, target_nodes=None):
- """
- Redis synchronous replication
- That returns the number of replicas that processed the query when
- we finally have at least ``num_replicas``, or when the ``timeout`` was
- reached.
-
- If more than one target node are passed the result will be summed up
- """
- return self.execute_command(
- "WAIT", num_replicas, timeout, target_nodes=target_nodes
+ kwargs.update({"target_nodes": target_nodes})
+ return super().stralgo(
+ algo,
+ value1,
+ value2,
+ specific_argument,
+ len,
+ idx,
+ minmatchlen,
+ withmatchlen,
+ **kwargs,
)
-class ClusterPubSubCommands:
- """
- Redis PubSub commands for RedisCluster use.
- see https://redis.io/topics/pubsub
- """
-
- def publish(self, channel, message, target_nodes=None):
- """
- Publish ``message`` on ``channel``.
- Returns the number of subscribers the message was delivered to.
- """
- return self.execute_command(
- "PUBLISH", channel, message, target_nodes=target_nodes
- )
-
- def pubsub_channels(self, pattern="*", target_nodes=None):
- """
- Return a list of channels that have at least one subscriber
- """
- return self.execute_command(
- "PUBSUB CHANNELS", pattern, target_nodes=target_nodes
- )
-
- def pubsub_numpat(self, target_nodes=None):
- """
- Returns the number of subscriptions to patterns
- """
- return self.execute_command("PUBSUB NUMPAT", target_nodes=target_nodes)
-
- def pubsub_numsub(self, *args, target_nodes=None):
- """
- Return a list of (channel, number of subscribers) tuples
- for each channel given in ``*args``
- """
- return self.execute_command("PUBSUB NUMSUB", *args, target_nodes=target_nodes)
-
-
-class ClusterCommands(
- ClusterManagementCommands,
+class RedisClusterCommands(
ClusterMultiKeyCommands,
- ClusterPubSubCommands,
- DataAccessCommands,
+ ClusterManagementCommands,
+ ACLCommands,
+ PubSubCommands,
+ ClusterDataAccessCommands,
):
"""
- Redis Cluster commands
+ A class for all Redis Cluster commands
+
+ For key-based commands, the target node(s) will be internally determined
+ by the keys' hash slot.
+ Non-key-based commands can be executed with the 'target_nodes' argument to
+ target specific nodes. By default, if target_nodes is not specified, the
+ command will be executed on the default cluster node.
- Commands with the 'target_nodes' argument can be executed on specified
- nodes. By default, if target_nodes is not specified, the command will be
- executed on the default cluster node.
:param :target_nodes: type can be one of the followings:
- - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- 'ClusterNode'
- 'list(ClusterNodes)'
- 'dict(any:clusterNodes)'
for example:
- r.cluster_info(target_nodes='all')
+ r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
"""
def cluster_addslots(self, target_node, *slots):