summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2022-06-23 14:34:08 +0300
committerGitHub <noreply@github.com>2022-06-23 14:34:08 +0300
commit6da80865be650344964e9497a9edcb68ff102ee8 (patch)
tree9fa3159f0575df6f854d272307b2f1e311212cf7
parent23fd3273ba4dbee35585f53208dc044112dd391f (diff)
downloadredis-py-6da80865be650344964e9497a9edcb68ff102ee8.tar.gz
Reuse the old nodes' connections when a cluster topology refresh is being done (#2235)
* A fix was made to reuse the old nodes' connections when a cluster topology refresh is being done * Fixed RedisCluster to immediately raise AuthenticationError * Updated CHANGES * Fixed cluster async bgsave test to ignore "bgsave already in progress" error * Fixed linters
-rw-r--r--CHANGES2
-rw-r--r--redis/cluster.py37
-rw-r--r--tests/test_asyncio/test_cluster.py10
-rw-r--r--tests/test_cluster.py40
4 files changed, 75 insertions, 14 deletions
diff --git a/CHANGES b/CHANGES
index 2678218..4afc7ba 100644
--- a/CHANGES
+++ b/CHANGES
@@ -11,6 +11,8 @@
* Fix broken connection writer lock-up for asyncio (#2065)
* Fix auth bug when provided with no username (#2086)
* Fix missing ClusterPipeline._lock (#2189)
+ * Fix reusing the old nodes' connections when cluster topology refresh is being done
+ * Fix RedisCluster to immediately raise AuthenticationError without a retry
* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
diff --git a/redis/cluster.py b/redis/cluster.py
index 8e4c654..1737ec7 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -14,6 +14,7 @@ from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
+ AuthenticationError,
BusyLoadingError,
ClusterCrossSlotError,
ClusterDownError,
@@ -1113,7 +1114,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
)
return response
- except (RedisClusterException, BusyLoadingError) as e:
+ except (RedisClusterException, BusyLoadingError, AuthenticationError) as e:
log.exception(type(e))
raise
except (ConnectionError, TimeoutError) as e:
@@ -1134,6 +1135,7 @@ class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
else:
# Hard force of reinitialize of the node/slots setup
# and try again with the new setup
+ target_node.redis_connection = None
self.nodes_manager.initialize()
raise
except MovedError as e:
@@ -1443,6 +1445,21 @@ class NodesManager:
r = Redis(host=host, port=port, **kwargs)
return r
+ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
+ node_name = get_node_name(host, port)
+ # check if we already have this node in the tmp_nodes_cache
+ target_node = tmp_nodes_cache.get(node_name)
+ if target_node is None:
+ # before creating a new cluster node, check if the cluster node already
+ # exists in the current nodes cache and has a valid connection so we can
+ # reuse it
+ target_node = self.nodes_cache.get(node_name)
+ if target_node is None or target_node.redis_connection is None:
+ # create new cluster node for this cluster
+ target_node = ClusterNode(host, port, role)
+
+ return target_node
+
def initialize(self):
"""
Initializes the nodes cache, slots cache and redis connections.
@@ -1521,14 +1538,14 @@ class NodesManager:
for slot in cluster_slots:
primary_node = slot[2]
- host = primary_node[0]
+ host = str_if_bytes(primary_node[0])
if host == "":
host = startup_node.host
port = int(primary_node[1])
- target_node = tmp_nodes_cache.get(get_node_name(host, port))
- if target_node is None:
- target_node = ClusterNode(host, port, PRIMARY)
+ target_node = self._get_or_create_cluster_node(
+ host, port, PRIMARY, tmp_nodes_cache
+ )
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
@@ -1539,14 +1556,12 @@ class NodesManager:
replica_nodes = [slot[j] for j in range(3, len(slot))]
for replica_node in replica_nodes:
- host = replica_node[0]
+ host = str_if_bytes(replica_node[0])
port = replica_node[1]
- target_replica_node = tmp_nodes_cache.get(
- get_node_name(host, port)
+ target_replica_node = self._get_or_create_cluster_node(
+ host, port, REPLICA, tmp_nodes_cache
)
- if target_replica_node is None:
- target_replica_node = ClusterNode(host, port, REPLICA)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[
@@ -1598,7 +1613,7 @@ class NodesManager:
# Set the default node
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
# Populate the startup nodes with all discovered nodes
- self.populate_startup_nodes(self.nodes_cache.values())
+ self.startup_nodes = tmp_nodes_cache
# If initialize was called after a MovedError, clear it
self._moved_exception = None
diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py
index e5ec026..f4ea5cd 100644
--- a/tests/test_asyncio/test_cluster.py
+++ b/tests/test_asyncio/test_cluster.py
@@ -1059,9 +1059,13 @@ class TestClusterRedisCommands:
@skip_if_redis_enterprise()
async def test_bgsave(self, r: RedisCluster) -> None:
- assert await r.bgsave()
- await asyncio.sleep(0.3)
- assert await r.bgsave(True)
+ try:
+ assert await r.bgsave()
+ await asyncio.sleep(0.3)
+ assert await r.bgsave(True)
+ except ResponseError as e:
+ if "Background save already in progress" not in e.__str__():
+ raise
async def test_info(self, r: RedisCluster) -> None:
# Map keys to same slot
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index d1568ef..438ef73 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -29,6 +29,7 @@ from redis.exceptions import (
RedisClusterException,
RedisError,
ResponseError,
+ TimeoutError,
)
from redis.utils import str_if_bytes
from tests.test_pubsub import wait_for_message
@@ -651,6 +652,45 @@ class TestRedisClusterObj:
else:
raise e
+ def test_timeout_error_topology_refresh_reuse_connections(self, r):
+ """
+ By mucking TIMEOUT errors, we'll force the cluster topology to be reinitialized,
+ and then ensure that only the impacted connection is replaced
+ """
+ node = r.get_node_from_key("key")
+ r.set("key", "value")
+ node_conn_origin = {}
+ for n in r.get_nodes():
+ node_conn_origin[n.name] = n.redis_connection
+ real_func = r.get_redis_connection(node).parse_response
+
+ class counter:
+ def __init__(self, val=0):
+ self.val = int(val)
+
+ count = counter(0)
+ with patch.object(Redis, "parse_response") as parse_response:
+
+ def moved_redirect_effect(connection, *args, **options):
+ # raise a timeout for 5 times so we'll need to reinitilize the topology
+ if count.val >= 5:
+ parse_response.side_effect = real_func
+ count.val += 1
+ raise TimeoutError()
+
+ parse_response.side_effect = moved_redirect_effect
+ assert r.get("key") == b"value"
+ for node_name, conn in node_conn_origin.items():
+ if node_name == node.name:
+ # The old redis connection of the timed out node should have been
+ # deleted and replaced
+ assert conn != r.get_redis_connection(node)
+ else:
+ # other nodes' redis connection should have been reused during the
+ # topology refresh
+ cur_node = r.get_node(node_name=node_name)
+ assert conn == r.get_redis_connection(cur_node)
+
@pytest.mark.onlycluster
class TestClusterRedisCommands: