diff options
author | Bar Shaul <88437685+barshaul@users.noreply.github.com> | 2021-12-02 22:59:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-02 22:59:06 +0200 |
commit | 11b14630a6845c28acfd4220b72ed62d72913305 (patch) | |
tree | 7c4ae49db2320416a956551613d90cad26b44052 | |
parent | b7ffec08da97b71b10bbd139b32ff82d33d907f1 (diff) | |
download | redis-py-11b14630a6845c28acfd4220b72ed62d72913305.tar.gz |
Added support for MONITOR in clusters (#1756)
-rw-r--r-- | redis/cluster.py | 17 | ||||
-rw-r--r-- | tests/conftest.py | 20 | ||||
-rw-r--r-- | tests/test_cluster.py | 54 | ||||
-rw-r--r-- | tests/test_commands.py | 53 | ||||
-rw-r--r-- | tests/test_connection_pool.py | 12 | ||||
-rw-r--r-- | tests/test_monitor.py | 4 | ||||
-rw-r--r-- | tests/test_pubsub.py | 2 |
7 files changed, 117 insertions, 45 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index c5634a0..b1adeb7 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -670,6 +670,23 @@ class RedisCluster(RedisClusterCommands): log.info(f"Changed the default cluster node to {node}") return True + def monitor(self, target_node=None): + """ + Returns a Monitor object for the specified target node. + The default cluster node will be selected if no target node was + specified. + Monitor is useful for handling the MONITOR command to the redis server. + next_command() method returns one command from monitor + listen() method yields commands from monitor. + """ + if target_node is None: + target_node = self.get_default_node() + if target_node.redis_connection is None: + raise RedisClusterException( + f"Cluster Node {target_node.name} has no redis_connection" + ) + return target_node.redis_connection.monitor() + def pubsub(self, node=None, host=None, port=None, **kwargs): """ Allows passing a ClusterNode, or host&port, to get a pubsub instance diff --git a/tests/conftest.py b/tests/conftest.py index 24783c0..ab29ee4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -151,12 +151,12 @@ def skip_ifmodversion_lt(min_version: str, module_name: str): raise AttributeError(f"No redis module named {module_name}") -def skip_if_redis_enterprise(func): +def skip_if_redis_enterprise(): check = REDIS_INFO["enterprise"] is True return pytest.mark.skipif(check, reason="Redis enterprise") -def skip_ifnot_redis_enterprise(func): +def skip_ifnot_redis_enterprise(): check = REDIS_INFO["enterprise"] is False return pytest.mark.skipif(check, reason="Not running in redis enterprise") @@ -324,16 +324,18 @@ def master_host(request): yield parts.hostname, parts.port -def wait_for_command(client, monitor, command): +def wait_for_command(client, monitor, command, key=None): # issue a command with a key name that's local to this process. # if we find a command with our key before the command we're waiting # for, something went wrong - redis_version = REDIS_INFO["version"] - if LooseVersion(redis_version) >= LooseVersion("5.0.0"): - id_str = str(client.client_id()) - else: - id_str = f"{random.randrange(2 ** 32):08x}" - key = f"__REDIS-PY-{id_str}__" + if key is None: + # generate key + redis_version = REDIS_INFO["version"] + if LooseVersion(redis_version) >= LooseVersion("5.0.0"): + id_str = str(client.client_id()) + else: + id_str = f"{random.randrange(2 ** 32):08x}" + key = f"__REDIS-PY-{id_str}__" client.get(key) while True: monitor_response = monitor.next_command() diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 4087d33..15d8ac6 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -36,6 +36,7 @@ from .conftest import ( skip_if_redis_enterprise, skip_if_server_version_lt, skip_unless_arch_bits, + wait_for_command, ) default_host = "127.0.0.1" @@ -1774,7 +1775,7 @@ class TestClusterRedisCommands: assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c") @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_log(self, r, request): key = "{cache}:" node = r.get_node_from_key(key) @@ -2631,3 +2632,54 @@ class TestReadOnlyPipeline: if executed_on_replica: break assert executed_on_replica is True + + +@pytest.mark.onlycluster +class TestClusterMonitor: + def test_wait_command_not_found(self, r): + "Make sure the wait_for_command func works when command is not found" + key = "foo" + node = r.get_node_from_key(key) + with r.monitor(target_node=node) as m: + response = wait_for_command(r, m, "nothing", key=key) + assert response is None + + def test_response_values(self, r): + db = 0 + key = "foo" + node = r.get_node_from_key(key) + with r.monitor(target_node=node) as m: + r.ping(target_nodes=node) + response = wait_for_command(r, m, "PING", key=key) + assert isinstance(response["time"], float) + assert response["db"] == db + assert response["client_type"] in ("tcp", "unix") + assert isinstance(response["client_address"], str) + assert isinstance(response["client_port"], str) + assert response["command"] == "PING" + + def test_command_with_quoted_key(self, r): + key = "{foo}1" + node = r.get_node_from_key(key) + with r.monitor(node) as m: + r.get('{foo}"bar') + response = wait_for_command(r, m, 'GET {foo}"bar', key=key) + assert response["command"] == 'GET {foo}"bar' + + def test_command_with_binary_data(self, r): + key = "{foo}1" + node = r.get_node_from_key(key) + with r.monitor(target_node=node) as m: + byte_string = b"{foo}bar\x92" + r.get(byte_string) + response = wait_for_command(r, m, "GET {foo}bar\\x92", key=key) + assert response["command"] == "GET {foo}bar\\x92" + + def test_command_with_escaped_data(self, r): + key = "{foo}1" + node = r.get_node_from_key(key) + with r.monitor(target_node=node) as m: + byte_string = b"{foo}bar\\x92" + r.get(byte_string) + response = wait_for_command(r, m, "GET {foo}bar\\\\x92", key=key) + assert response["command"] == "GET {foo}bar\\\\x92" diff --git a/tests/test_commands.py b/tests/test_commands.py index 556df84..936cbe5 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -84,7 +84,7 @@ class TestRedisCommands: assert "get" in commands @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_deluser(self, r, request): username = "redis-py-user" @@ -109,7 +109,7 @@ class TestRedisCommands: assert r.acl_getuser(users[4]) is None @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_genpass(self, r): password = r.acl_genpass() assert isinstance(password, str) @@ -123,7 +123,7 @@ class TestRedisCommands: assert isinstance(password, str) @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_getuser_setuser(self, r, request): username = "redis-py-user" @@ -236,7 +236,7 @@ class TestRedisCommands: assert len(res) != 0 @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_list(self, r, request): username = "redis-py-user" @@ -250,7 +250,8 @@ class TestRedisCommands: assert len(users) == 2 @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() + @pytest.mark.onlynoncluster def test_acl_log(self, r, request): username = "redis-py-user" @@ -292,7 +293,7 @@ class TestRedisCommands: assert r.acl_log_reset() @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_setuser_categories_without_prefix_fails(self, r, request): username = "redis-py-user" @@ -305,7 +306,7 @@ class TestRedisCommands: r.acl_setuser(username, categories=["list"]) @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_setuser_commands_without_prefix_fails(self, r, request): username = "redis-py-user" @@ -318,7 +319,7 @@ class TestRedisCommands: r.acl_setuser(username, commands=["get"]) @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request): username = "redis-py-user" @@ -363,7 +364,7 @@ class TestRedisCommands: clients = r.client_list(_type=client_type) assert isinstance(clients, list) - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_client_list_replica(self, r): clients = r.client_list(_type="replica") assert isinstance(clients, list) @@ -529,7 +530,7 @@ class TestRedisCommands: assert r.client_kill_filter(laddr=client_2_addr) @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_client_kill_filter_by_user(self, r, request): killuser = "user_to_kill" r.acl_setuser( @@ -549,7 +550,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.9.50") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_client_pause(self, r): assert r.client_pause(1) assert r.client_pause(timeout=1) @@ -558,7 +559,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.2.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_client_unpause(self, r): assert r.client_unpause() == b"OK" @@ -578,7 +579,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_client_getredir(self, r): assert isinstance(r.client_getredir(), int) assert r.client_getredir() == -1 @@ -590,7 +591,7 @@ class TestRedisCommands: # assert data['maxmemory'].isdigit() @pytest.mark.onlynoncluster - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_config_resetstat(self, r): r.ping() prior_commands_processed = int(r.info()["total_commands_processed"]) @@ -599,7 +600,7 @@ class TestRedisCommands: reset_commands_processed = int(r.info()["total_commands_processed"]) assert reset_commands_processed < prior_commands_processed - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_config_set(self, r): r.config_set("timeout", 70) assert r.config_get()["timeout"] == "70" @@ -626,7 +627,7 @@ class TestRedisCommands: assert "redis_version" in info.keys() @pytest.mark.onlynoncluster - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_lastsave(self, r): assert isinstance(r.lastsave(), datetime.datetime) @@ -731,7 +732,7 @@ class TestRedisCommands: assert isinstance(t[0], int) assert isinstance(t[1], int) - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_bgsave(self, r): assert r.bgsave() time.sleep(0.3) @@ -1312,7 +1313,7 @@ class TestRedisCommands: value2 = "mynewtext" res = "mytext" - if skip_if_redis_enterprise(None).args[0] is True: + if skip_if_redis_enterprise().args[0] is True: with pytest.raises(redis.exceptions.ResponseError): assert r.stralgo("LCS", value1, value2) == res return @@ -1354,7 +1355,7 @@ class TestRedisCommands: def test_substr(self, r): r["a"] = "0123456789" - if skip_if_redis_enterprise(None).args[0] is True: + if skip_if_redis_enterprise().args[0] is True: with pytest.raises(redis.exceptions.ResponseError): assert r.substr("a", 0) == b"0123456789" return @@ -2665,7 +2666,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("3.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_readwrite(self, r): assert r.readwrite() @@ -4016,7 +4017,7 @@ class TestRedisCommands: @skip_if_server_version_lt("4.0.0") def test_memory_malloc_stats(self, r): - if skip_if_redis_enterprise(None).args[0] is True: + if skip_if_redis_enterprise().args[0] is True: with pytest.raises(redis.exceptions.ResponseError): assert r.memory_malloc_stats() return @@ -4029,7 +4030,7 @@ class TestRedisCommands: # has data r.set("foo", "bar") - if skip_if_redis_enterprise(None).args[0] is True: + if skip_if_redis_enterprise().args[0] is True: with pytest.raises(redis.exceptions.ResponseError): stats = r.memory_stats() return @@ -4047,7 +4048,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("4.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_module_list(self, r): assert isinstance(r.module_list(), list) for x in r.module_list(): @@ -4088,7 +4089,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("4.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_module(self, r): with pytest.raises(redis.exceptions.ModuleError) as excinfo: r.module_load("/some/fake/path") @@ -4144,7 +4145,7 @@ class TestRedisCommands: @pytest.mark.onlynoncluster @skip_if_server_version_lt("5.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_replicaof(self, r): with pytest.raises(redis.ResponseError): assert r.replicaof("NO ONE") @@ -4226,7 +4227,7 @@ class TestBinarySave: assert "6" in parsed["allocation_stats"] assert ">=256" in parsed["allocation_stats"] - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_large_responses(self, r): "The PythonParser has some special cases for return values > 1MB" # load up 5MB of data into a key diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py index 138fcad..3e1fbae 100644 --- a/tests/test_connection_pool.py +++ b/tests/test_connection_pool.py @@ -514,7 +514,7 @@ class TestConnection: @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_busy_loading_disconnects_socket(self, r): """ If Redis raises a LOADING error, the connection should be @@ -526,7 +526,7 @@ class TestConnection: @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_busy_loading_from_pipeline_immediate_command(self, r): """ BusyLoadingErrors should raise from Pipelines that execute a @@ -542,7 +542,7 @@ class TestConnection: @pytest.mark.onlynoncluster @skip_if_server_version_lt("2.8.8") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_busy_loading_from_pipeline(self, r): """ BusyLoadingErrors should be raised from a pipeline execution @@ -558,7 +558,7 @@ class TestConnection: assert not pool._available_connections[0]._sock @skip_if_server_version_lt("2.8.8") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_read_only_error(self, r): "READONLY errors get turned in ReadOnlyError exceptions" with pytest.raises(redis.ReadOnlyError): @@ -584,7 +584,7 @@ class TestConnection: "path=/path/to/socket,db=0", ) - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_connect_no_auth_supplied_when_required(self, r): """ AuthenticationError should be raised when the server requires a @@ -595,7 +595,7 @@ class TestConnection: "DEBUG", "ERROR", "ERR Client sent AUTH, but no password is set" ) - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_connect_invalid_password_supplied(self, r): "AuthenticationError should be raised when sending the wrong password" with pytest.raises(redis.AuthenticationError): diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 40d9e43..9b07c80 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -47,7 +47,7 @@ class TestMonitor: response = wait_for_command(r, m, "GET foo\\\\x92") assert response["command"] == "GET foo\\\\x92" - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_lua_script(self, r): with r.monitor() as m: script = 'return redis.call("GET", "foo")' @@ -58,7 +58,7 @@ class TestMonitor: assert response["client_address"] == "lua" assert response["client_port"] == "" - @skip_ifnot_redis_enterprise + @skip_ifnot_redis_enterprise() def test_lua_script_in_enterprise(self, r): with r.monitor() as m: script = 'return redis.call("GET", "foo")' diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 6df0faf..20ae0a0 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -530,7 +530,7 @@ class TestPubSubPings: @pytest.mark.onlynoncluster class TestPubSubConnectionKilled: @skip_if_server_version_lt("3.0.0") - @skip_if_redis_enterprise + @skip_if_redis_enterprise() def test_connection_error_raised_when_connection_dies(self, r): p = r.pubsub() p.subscribe("foo") |