summaryrefslogtreecommitdiff
path: root/redis/asyncio
diff options
context:
space:
mode:
authordvora-h <67596500+dvora-h@users.noreply.github.com>2023-03-29 16:50:37 +0300
committerGitHub <noreply@github.com>2023-03-29 16:50:37 +0300
commit49d9cb751c7c30546fc86d11f8168d7aa007edae (patch)
treeb940ea97748b242c40a6b7c2d2e87cabf4d002d8 /redis/asyncio
parentb3c89acd0ffe8303649ad8207bc911b1d6a033eb (diff)
downloadredis-py-4.4.tar.gz
Version 4.4.4 (#2671)v4.4.44.4
* Fixing cancelled async futures (#2666) Co-authored-by: James R T <jamestiotio@gmail.com> Co-authored-by: dvora-h <dvora.heller@redis.com> * Version 4.4.4 * fix tests * linters * fixing the test for the 4.4 state * remove superfluous try-except --------- Co-authored-by: Chayim <chayim@users.noreply.github.com> Co-authored-by: James R T <jamestiotio@gmail.com> Co-authored-by: Chayim I. Kirshen <c@kirshen.com>
Diffstat (limited to 'redis/asyncio')
-rw-r--r--redis/asyncio/client.py88
-rw-r--r--redis/asyncio/cluster.py21
2 files changed, 75 insertions, 34 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index 0bd21d3..4807d08 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -493,24 +493,32 @@ class Redis(
):
raise error
- # COMMAND EXECUTION AND PROTOCOL PARSING
- async def execute_command(self, *args, **options):
- """Execute a command and return a parsed response"""
- await self.initialize()
- pool = self.connection_pool
- command_name = args[0]
- conn = self.connection or await pool.get_connection(command_name, **options)
-
+ async def _try_send_command_parse_response(self, conn, *args, **options):
try:
return await conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
- conn, command_name, *args, **options
+ conn, args[0], *args, **options
),
lambda error: self._disconnect_raise(conn, error),
)
+ except asyncio.CancelledError:
+ await conn.disconnect(nowait=True)
+ raise
finally:
if not self.connection:
- await pool.release(conn)
+ await self.connection_pool.release(conn)
+
+ # COMMAND EXECUTION AND PROTOCOL PARSING
+ async def execute_command(self, *args, **options):
+ """Execute a command and return a parsed response"""
+ await self.initialize()
+ pool = self.connection_pool
+ command_name = args[0]
+ conn = self.connection or await pool.get_connection(command_name, **options)
+
+ return await asyncio.shield(
+ self._try_send_command_parse_response(conn, *args, **options)
+ )
async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -749,10 +757,18 @@ class PubSub:
is not a TimeoutError. Otherwise, try to reconnect
"""
await conn.disconnect()
+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
raise error
await conn.connect()
+ async def _try_execute(self, conn, command, *arg, **kwargs):
+ try:
+ return await command(*arg, **kwargs)
+ except asyncio.CancelledError:
+ await conn.disconnect()
+ raise
+
async def _execute(self, conn, command, *args, **kwargs):
"""
Connect manually upon disconnection. If the Redis server is down,
@@ -761,9 +777,11 @@ class PubSub:
called by the # connection to resubscribe us to any channels and
patterns we were previously listening to
"""
- return await conn.retry.call_with_retry(
- lambda: command(*args, **kwargs),
- lambda error: self._disconnect_raise_connect(conn, error),
+ return await asyncio.shield(
+ conn.retry.call_with_retry(
+ lambda: self._try_execute(conn, command, *args, **kwargs),
+ lambda error: self._disconnect_raise_connect(conn, error),
+ )
)
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -1165,6 +1183,18 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
+ async def _try_send_command_parse_response(self, conn, *args, **options):
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, args[0], *args, **options
+ ),
+ lambda error: self._disconnect_reset_raise(conn, error),
+ )
+ except asyncio.CancelledError:
+ await conn.disconnect()
+ raise
+
async def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
@@ -1180,12 +1210,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
command_name, self.shard_hint
)
self.connection = conn
-
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, command_name, *args, **options
- ),
- lambda error: self._disconnect_reset_raise(conn, error),
+ return await asyncio.shield(
+ self._try_send_command_parse_response(conn, *args, **options)
)
def pipeline_execute_command(self, *args, **options):
@@ -1353,6 +1379,19 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
+ async def _try_execute(self, conn, execute, stack, raise_on_error):
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error),
+ )
+ except asyncio.CancelledError:
+ # not supposed to be possible, yet here we are
+ await conn.disconnect(nowait=True)
+ raise
+ finally:
+ await self.reset()
+
async def execute(self, raise_on_error: bool = True):
"""Execute all the commands in the current pipeline"""
stack = self.command_stack
@@ -1375,15 +1414,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
try:
return await asyncio.shield(
- conn.retry.call_with_retry(
- lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error),
- )
+ self._try_execute(conn, execute, stack, raise_on_error)
)
- except asyncio.CancelledError:
- # not supposed to be possible, yet here we are
- await conn.disconnect(nowait=True)
- raise
+ except RuntimeError:
+ await self.reset()
finally:
await self.reset()
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 569a076..a4a9561 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -1016,6 +1016,19 @@ class ClusterNode:
finally:
self._free.append(connection)
+ async def _try_parse_response(self, cmd, connection, ret):
+ try:
+ cmd.result = await asyncio.shield(
+ self.parse_response(connection, cmd.args[0], **cmd.kwargs)
+ )
+ except asyncio.CancelledError:
+ await connection.disconnect(nowait=True)
+ raise
+ except Exception as e:
+ cmd.result = e
+ ret = True
+ return ret
+
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -1028,13 +1041,7 @@ class ClusterNode:
# Read responses
ret = False
for cmd in commands:
- try:
- cmd.result = await self.parse_response(
- connection, cmd.args[0], **cmd.kwargs
- )
- except Exception as e:
- cmd.result = e
- ret = True
+ ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
# Release connection
self._free.append(connection)