diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2021-12-01 18:33:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-01 18:33:44 +0200 |
commit | d4252277a9dafed5af34b3f40ed7a57fc952d273 (patch) | |
tree | f0a96ff0e12c6446bd9ecbf7ed8248a0d5462d8c /redis/commands/cluster.py | |
parent | e16e26ea597e6e0c576d7462d3a2285a8647617d (diff) | |
download | redis-py-d4252277a9dafed5af34b3f40ed7a57fc952d273.tar.gz |
Migrated targeted nodes to kwargs in Cluster Mode (#1762)
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r-- | redis/commands/cluster.py | 598 |
1 files changed, 45 insertions, 553 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 0df073a..5d0e804 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,7 +1,7 @@ from redis.crc import key_slot -from redis.exceptions import ConnectionError, DataError, RedisError +from redis.exceptions import RedisClusterException, RedisError -from .core import DataAccessCommands +from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands from .helpers import list_or_args @@ -144,451 +144,31 @@ class ClusterMultiKeyCommands: return self._split_command_across_slots("UNLINK", *keys) -class ClusterManagementCommands: +class ClusterManagementCommands(ManagementCommands): """ - Redis Cluster management commands + A class for Redis Cluster management commands - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. - - :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' - - 'ClusterNode' - - 'list(ClusterNodes)' - - 'dict(any:clusterNodes)' - - for example: - primary = r.get_primaries()[0] - r.bgsave(target_nodes=primary) - r.bgsave(target_nodes='primaries') + The class inherits from Redis's core ManagementCommands class and do the + required adjustments to work with cluster mode """ - def bgsave(self, schedule=True, target_nodes=None): - """ - Tell the Redis server to save its data to disk. Unlike save(), - this method is asynchronous and returns immediately. - """ - pieces = [] - if schedule: - pieces.append("SCHEDULE") - return self.execute_command("BGSAVE", *pieces, target_nodes=target_nodes) - - def client_getname(self, target_nodes=None): - """ - Returns the current connection name from all nodes. - The result will be a dictionary with the IP and - connection name. - """ - return self.execute_command("CLIENT GETNAME", target_nodes=target_nodes) - - def client_getredir(self, target_nodes=None): - """Returns the ID (an integer) of the client to whom we are - redirecting tracking notifications. - - see: https://redis.io/commands/client-getredir - """ - return self.execute_command("CLIENT GETREDIR", target_nodes=target_nodes) - - def client_id(self, target_nodes=None): - """Returns the current connection id""" - return self.execute_command("CLIENT ID", target_nodes=target_nodes) - - def client_info(self, target_nodes=None): - """ - Returns information and statistics about the current - client connection. - """ - return self.execute_command("CLIENT INFO", target_nodes=target_nodes) - - def client_kill_filter( - self, - _id=None, - _type=None, - addr=None, - skipme=None, - laddr=None, - user=None, - target_nodes=None, - ): - """ - Disconnects client(s) using a variety of filter options - :param id: Kills a client by its unique ID field - :param type: Kills a client by type where type is one of 'normal', - 'master', 'slave' or 'pubsub' - :param addr: Kills a client by its 'address:port' - :param skipme: If True, then the client calling the command - will not get killed even if it is identified by one of the filter - options. If skipme is not provided, the server defaults to skipme=True - :param laddr: Kills a client by its 'local (bind) address:port' - :param user: Kills a client for a specific user name - """ - args = [] - if _type is not None: - client_types = ("normal", "master", "slave", "pubsub") - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT KILL type must be one of {client_types!r}") - args.extend((b"TYPE", _type)) - if skipme is not None: - if not isinstance(skipme, bool): - raise DataError("CLIENT KILL skipme must be a bool") - if skipme: - args.extend((b"SKIPME", b"YES")) - else: - args.extend((b"SKIPME", b"NO")) - if _id is not None: - args.extend((b"ID", _id)) - if addr is not None: - args.extend((b"ADDR", addr)) - if laddr is not None: - args.extend((b"LADDR", laddr)) - if user is not None: - args.extend((b"USER", user)) - if not args: - raise DataError( - "CLIENT KILL <filter> <value> ... ... <filter> " - "<value> must specify at least one filter" - ) - return self.execute_command("CLIENT KILL", *args, target_nodes=target_nodes) - - def client_kill(self, address, target_nodes=None): - "Disconnects the client at ``address`` (ip:port)" - return self.execute_command("CLIENT KILL", address, target_nodes=target_nodes) - - def client_list(self, _type=None, target_nodes=None): - """ - Returns a list of currently connected clients to the entire cluster. - If type of client specified, only that type will be returned. - :param _type: optional. one of the client types (normal, master, - replica, pubsub) - """ - if _type is not None: - client_types = ("normal", "master", "replica", "pubsub") - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT LIST _type must be one of {client_types!r}") - return self.execute_command( - "CLIENT LIST", b"TYPE", _type, target_noes=target_nodes - ) - return self.execute_command("CLIENT LIST", target_nodes=target_nodes) - - def client_pause(self, timeout, target_nodes=None): - """ - Suspend all the Redis clients for the specified amount of time - :param timeout: milliseconds to pause clients - """ - if not isinstance(timeout, int): - raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command( - "CLIENT PAUSE", str(timeout), target_nodes=target_nodes - ) - - def client_reply(self, reply, target_nodes=None): - """Enable and disable redis server replies. - ``reply`` Must be ON OFF or SKIP, - ON - The default most with server replies to commands - OFF - Disable server responses to commands - SKIP - Skip the response of the immediately following command. - - Note: When setting OFF or SKIP replies, you will need a client object - with a timeout specified in seconds, and will need to catch the - TimeoutError. - The test_client_reply unit test illustrates this, and - conftest.py has a client with a timeout. - See https://redis.io/commands/client-reply - """ - replies = ["ON", "OFF", "SKIP"] - if reply not in replies: - raise DataError(f"CLIENT REPLY must be one of {replies!r}") - return self.execute_command("CLIENT REPLY", reply, target_nodes=target_nodes) - - def client_setname(self, name, target_nodes=None): - "Sets the current connection name" - return self.execute_command("CLIENT SETNAME", name, target_nodes=target_nodes) - - def client_trackinginfo(self, target_nodes=None): - """ - Returns the information about the current client connection's - use of the server assisted client side cache. - See https://redis.io/commands/client-trackinginfo - """ - return self.execute_command("CLIENT TRACKINGINFO", target_nodes=target_nodes) - - def client_unblock(self, client_id, error=False, target_nodes=None): - """ - Unblocks a connection by its client id. - If ``error`` is True, unblocks the client with a special error message. - If ``error`` is False (default), the client is unblocked using the - regular timeout mechanism. - """ - args = ["CLIENT UNBLOCK", int(client_id)] - if error: - args.append(b"ERROR") - return self.execute_command(*args, target_nodes=target_nodes) - - def client_unpause(self, target_nodes=None): - """ - Unpause all redis clients - """ - return self.execute_command("CLIENT UNPAUSE", target_nodes=target_nodes) - - def command(self, target_nodes=None): - """ - Returns dict reply of details about all Redis commands. - """ - return self.execute_command("COMMAND", target_nodes=target_nodes) - - def command_count(self, target_nodes=None): - """ - Returns Integer reply of number of total commands in this Redis server. - """ - return self.execute_command("COMMAND COUNT", target_nodes=target_nodes) - - def config_get(self, pattern="*", target_nodes=None): - """ - Return a dictionary of configuration based on the ``pattern`` - """ - return self.execute_command("CONFIG GET", pattern, target_nodes=target_nodes) - - def config_resetstat(self, target_nodes=None): - """Reset runtime statistics""" - return self.execute_command("CONFIG RESETSTAT", target_nodes=target_nodes) - - def config_rewrite(self, target_nodes=None): - """ - Rewrite config file with the minimal change to reflect running config. - """ - return self.execute_command("CONFIG REWRITE", target_nodes=target_nodes) - - def config_set(self, name, value, target_nodes=None): - "Set config item ``name`` with ``value``" - return self.execute_command( - "CONFIG SET", name, value, target_nodes=target_nodes - ) - - def dbsize(self, target_nodes=None): - """ - Sums the number of keys in the target nodes' DB. - - :target_nodes: 'ClusterNode' or 'list(ClusterNodes)' - The node/s to execute the command on - """ - return self.execute_command("DBSIZE", target_nodes=target_nodes) - - def debug_object(self, key): - raise NotImplementedError( - "DEBUG OBJECT is intentionally not implemented in the client." - ) - - def debug_segfault(self): - raise NotImplementedError( - "DEBUG SEGFAULT is intentionally not implemented in the client." - ) - - def echo(self, value, target_nodes): - """Echo the string back from the server""" - return self.execute_command("ECHO", value, target_nodes=target_nodes) - - def flushall(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - In cluster mode this method is the same as flushdb - - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b"ASYNC") - return self.execute_command("FLUSHALL", *args, target_nodes=target_nodes) - - def flushdb(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b"ASYNC") - return self.execute_command("FLUSHDB", *args, target_nodes=target_nodes) - - def info(self, section=None, target_nodes=None): - """ - Returns a dictionary containing information about the Redis server - - The ``section`` option can be used to select a specific section - of information - - The section option is not supported by older versions of Redis Server, - and will generate ResponseError - """ - if section is None: - return self.execute_command("INFO", target_nodes=target_nodes) - else: - return self.execute_command("INFO", section, target_nodes=target_nodes) - - def keys(self, pattern="*", target_nodes=None): - "Returns a list of keys matching ``pattern``" - return self.execute_command("KEYS", pattern, target_nodes=target_nodes) - - def lastsave(self, target_nodes=None): - """ - Return a Python datetime object representing the last time the - Redis database was saved to disk - """ - return self.execute_command("LASTSAVE", target_nodes=target_nodes) - - def memory_doctor(self): - raise NotImplementedError( - "MEMORY DOCTOR is intentionally not implemented in the client." - ) - - def memory_help(self): - raise NotImplementedError( - "MEMORY HELP is intentionally not implemented in the client." - ) - - def memory_malloc_stats(self, target_nodes=None): - """Return an internal statistics report from the memory allocator.""" - return self.execute_command("MEMORY MALLOC-STATS", target_nodes=target_nodes) - - def memory_purge(self, target_nodes=None): - """Attempts to purge dirty pages for reclamation by allocator""" - return self.execute_command("MEMORY PURGE", target_nodes=target_nodes) + def slaveof(self, *args, **kwargs): + raise RedisClusterException("SLAVEOF is not supported in cluster mode") - def memory_stats(self, target_nodes=None): - """Return a dictionary of memory stats""" - return self.execute_command("MEMORY STATS", target_nodes=target_nodes) + def replicaof(self, *args, **kwargs): + raise RedisClusterException("REPLICAOF is not supported in cluster" " mode") - def memory_usage(self, key, samples=None): - """ - Return the total memory usage for key, its value and associated - administrative overheads. - - For nested data structures, ``samples`` is the number of elements to - sample. If left unspecified, the server's default is 5. Use 0 to sample - all elements. - """ - args = [] - if isinstance(samples, int): - args.extend([b"SAMPLES", samples]) - return self.execute_command("MEMORY USAGE", key, *args) - - def object(self, infotype, key): - """Return the encoding, idletime, or refcount about the key""" - return self.execute_command("OBJECT", infotype, key, infotype=infotype) - - def ping(self, target_nodes=None): - """ - Ping the cluster's servers. - If no target nodes are specified, sent to all nodes and returns True if - the ping was successful across all nodes. - """ - return self.execute_command("PING", target_nodes=target_nodes) - - def randomkey(self, target_nodes=None): - """ - Returns the name of a random key" - """ - return self.execute_command("RANDOMKEY", target_nodes=target_nodes) + def swapdb(self, *args, **kwargs): + raise RedisClusterException("SWAPDB is not supported in cluster" " mode") - def save(self, target_nodes=None): - """ - Tell the Redis server to save its data to disk, - blocking until the save is complete - """ - return self.execute_command("SAVE", target_nodes=target_nodes) - - def scan(self, cursor=0, match=None, count=None, _type=None, target_nodes=None): - """ - Incrementally return lists of key names. Also return a cursor - indicating the scan position. - - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. - - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - pieces = [cursor] - if match is not None: - pieces.extend([b"MATCH", match]) - if count is not None: - pieces.extend([b"COUNT", count]) - if _type is not None: - pieces.extend([b"TYPE", _type]) - return self.execute_command("SCAN", *pieces, target_nodes=target_nodes) - - def scan_iter(self, match=None, count=None, _type=None, target_nodes=None): - """ - Make an iterator using the SCAN command so that the client doesn't - need to remember the cursor position. - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. +class ClusterDataAccessCommands(DataAccessCommands): + """ + A class for Redis Cluster Data Access Commands - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - cursor = "0" - while cursor != 0: - cursor, data = self.scan( - cursor=cursor, - match=match, - count=count, - _type=_type, - target_nodes=target_nodes, - ) - yield from data - - def shutdown(self, save=False, nosave=False, target_nodes=None): - """Shutdown the Redis server. If Redis has persistence configured, - data will be flushed before shutdown. If the "save" option is set, - a data flush will be attempted even if there is no persistence - configured. If the "nosave" option is set, no data flush will be - attempted. The "save" and "nosave" options cannot both be set. - """ - if save and nosave: - raise DataError("SHUTDOWN save and nosave cannot both be set") - args = ["SHUTDOWN"] - if save: - args.append("SAVE") - if nosave: - args.append("NOSAVE") - try: - self.execute_command(*args, target_nodes=target_nodes) - except ConnectionError: - # a ConnectionError here is expected - return - raise RedisError("SHUTDOWN seems to have failed.") - - def slowlog_get(self, num=None, target_nodes=None): - """ - Get the entries from the slowlog. If ``num`` is specified, get the - most recent ``num`` items. - """ - args = ["SLOWLOG GET"] - if num is not None: - args.append(num) - - return self.execute_command(*args, target_nodes=target_nodes) - - def slowlog_len(self, target_nodes=None): - "Get the number of items in the slowlog" - return self.execute_command("SLOWLOG LEN", target_nodes=target_nodes) - - def slowlog_reset(self, target_nodes=None): - "Remove all items in the slowlog" - return self.execute_command("SLOWLOG RESET", target_nodes=target_nodes) + The class inherits from Redis's core DataAccessCommand class and do the + required adjustments to work with cluster mode + """ def stralgo( self, @@ -600,138 +180,50 @@ class ClusterManagementCommands: idx=False, minmatchlen=None, withmatchlen=False, - target_nodes=None, + **kwargs, ): - """ - Implements complex algorithms that operate on strings. - Right now the only algorithm implemented is the LCS algorithm - (longest common substring). However new algorithms could be - implemented in the future. - - ``algo`` Right now must be LCS - ``value1`` and ``value2`` Can be two strings or two keys - ``specific_argument`` Specifying if the arguments to the algorithm - will be keys or strings. strings is the default. - ``len`` Returns just the len of the match. - ``idx`` Returns the match positions in each string. - ``minmatchlen`` Restrict the list of matches to the ones of a given - minimal length. Can be provided only when ``idx`` set to True. - ``withmatchlen`` Returns the matches with the len of the match. - Can be provided only when ``idx`` set to True. - """ - # check validity - supported_algo = ["LCS"] - if algo not in supported_algo: - supported_algos_str = ", ".join(supported_algo) - raise DataError(f"The supported algorithms are: {supported_algos_str}") - if specific_argument not in ["keys", "strings"]: - raise DataError("specific_argument can be only keys or strings") - if len and idx: - raise DataError("len and idx cannot be provided together.") - - pieces = [algo, specific_argument.upper(), value1, value2] - if len: - pieces.append(b"LEN") - if idx: - pieces.append(b"IDX") - try: - int(minmatchlen) - pieces.extend([b"MINMATCHLEN", minmatchlen]) - except TypeError: - pass - if withmatchlen: - pieces.append(b"WITHMATCHLEN") + target_nodes = kwargs.pop("target_nodes", None) if specific_argument == "strings" and target_nodes is None: target_nodes = "default-node" - return self.execute_command( - "STRALGO", - *pieces, - len=len, - idx=idx, - minmatchlen=minmatchlen, - withmatchlen=withmatchlen, - target_nodes=target_nodes, - ) - - def time(self, target_nodes=None): - """ - Returns the server time as a 2-item tuple of ints: - (seconds since epoch, microseconds into this second). - """ - return self.execute_command("TIME", target_nodes=target_nodes) - - def wait(self, num_replicas, timeout, target_nodes=None): - """ - Redis synchronous replication - That returns the number of replicas that processed the query when - we finally have at least ``num_replicas``, or when the ``timeout`` was - reached. - - If more than one target node are passed the result will be summed up - """ - return self.execute_command( - "WAIT", num_replicas, timeout, target_nodes=target_nodes + kwargs.update({"target_nodes": target_nodes}) + return super().stralgo( + algo, + value1, + value2, + specific_argument, + len, + idx, + minmatchlen, + withmatchlen, + **kwargs, ) -class ClusterPubSubCommands: - """ - Redis PubSub commands for RedisCluster use. - see https://redis.io/topics/pubsub - """ - - def publish(self, channel, message, target_nodes=None): - """ - Publish ``message`` on ``channel``. - Returns the number of subscribers the message was delivered to. - """ - return self.execute_command( - "PUBLISH", channel, message, target_nodes=target_nodes - ) - - def pubsub_channels(self, pattern="*", target_nodes=None): - """ - Return a list of channels that have at least one subscriber - """ - return self.execute_command( - "PUBSUB CHANNELS", pattern, target_nodes=target_nodes - ) - - def pubsub_numpat(self, target_nodes=None): - """ - Returns the number of subscriptions to patterns - """ - return self.execute_command("PUBSUB NUMPAT", target_nodes=target_nodes) - - def pubsub_numsub(self, *args, target_nodes=None): - """ - Return a list of (channel, number of subscribers) tuples - for each channel given in ``*args`` - """ - return self.execute_command("PUBSUB NUMSUB", *args, target_nodes=target_nodes) - - -class ClusterCommands( - ClusterManagementCommands, +class RedisClusterCommands( ClusterMultiKeyCommands, - ClusterPubSubCommands, - DataAccessCommands, + ClusterManagementCommands, + ACLCommands, + PubSubCommands, + ClusterDataAccessCommands, ): """ - Redis Cluster commands + A class for all Redis Cluster commands + + For key-based commands, the target node(s) will be internally determined + by the keys' hash slot. + Non-key-based commands can be executed with the 'target_nodes' argument to + target specific nodes. By default, if target_nodes is not specified, the + command will be executed on the default cluster node. - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' + - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM - 'ClusterNode' - 'list(ClusterNodes)' - 'dict(any:clusterNodes)' for example: - r.cluster_info(target_nodes='all') + r.cluster_info(target_nodes=RedisCluster.ALL_NODES) """ def cluster_addslots(self, target_node, *slots): |