summaryrefslogtreecommitdiff
path: root/redis/asyncio/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r--redis/asyncio/client.py93
1 files changed, 27 insertions, 66 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index 5fb94b3..a7b888e 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -500,23 +500,6 @@ class Redis(
):
raise error
- async def _try_send_command_parse_response(self, conn, *args, **options):
- try:
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, args[0], *args, **options
- ),
- lambda error: self._disconnect_raise(conn, error),
- )
- except asyncio.CancelledError:
- await conn.disconnect(nowait=True)
- raise
- finally:
- if self.single_connection_client:
- self._single_conn_lock.release()
- if not self.connection:
- await self.connection_pool.release(conn)
-
# COMMAND EXECUTION AND PROTOCOL PARSING
async def execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
@@ -527,10 +510,18 @@ class Redis(
if self.single_connection_client:
await self._single_conn_lock.acquire()
-
- return await asyncio.shield(
- self._try_send_command_parse_response(conn, *args, **options)
- )
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_raise(conn, error),
+ )
+ finally:
+ if self.single_connection_client:
+ self._single_conn_lock.release()
+ if not self.connection:
+ await pool.release(conn)
async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -774,18 +765,10 @@ class PubSub:
is not a TimeoutError. Otherwise, try to reconnect
"""
await conn.disconnect()
-
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
raise error
await conn.connect()
- async def _try_execute(self, conn, command, *arg, **kwargs):
- try:
- return await command(*arg, **kwargs)
- except asyncio.CancelledError:
- await conn.disconnect()
- raise
-
async def _execute(self, conn, command, *args, **kwargs):
"""
Connect manually upon disconnection. If the Redis server is down,
@@ -794,11 +777,9 @@ class PubSub:
called by the # connection to resubscribe us to any channels and
patterns we were previously listening to
"""
- return await asyncio.shield(
- conn.retry.call_with_retry(
- lambda: self._try_execute(conn, command, *args, **kwargs),
- lambda error: self._disconnect_raise_connect(conn, error),
- )
+ return await conn.retry.call_with_retry(
+ lambda: command(*args, **kwargs),
+ lambda error: self._disconnect_raise_connect(conn, error),
)
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -816,7 +797,9 @@ class PubSub:
await conn.connect()
read_timeout = None if block else timeout
- response = await self._execute(conn, conn.read_response, timeout=read_timeout)
+ response = await self._execute(
+ conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
+ )
if conn.health_check_interval and response == self.health_check_response:
# ignore the health check message as user might not expect it
@@ -1200,18 +1183,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
- async def _try_send_command_parse_response(self, conn, *args, **options):
- try:
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, args[0], *args, **options
- ),
- lambda error: self._disconnect_reset_raise(conn, error),
- )
- except asyncio.CancelledError:
- await conn.disconnect()
- raise
-
async def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
@@ -1227,8 +1198,12 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
command_name, self.shard_hint
)
self.connection = conn
- return await asyncio.shield(
- self._try_send_command_parse_response(conn, *args, **options)
+
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_reset_raise(conn, error),
)
def pipeline_execute_command(self, *args, **options):
@@ -1396,19 +1371,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
- async def _try_execute(self, conn, execute, stack, raise_on_error):
- try:
- return await conn.retry.call_with_retry(
- lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error),
- )
- except asyncio.CancelledError:
- # not supposed to be possible, yet here we are
- await conn.disconnect(nowait=True)
- raise
- finally:
- await self.reset()
-
async def execute(self, raise_on_error: bool = True):
"""Execute all the commands in the current pipeline"""
stack = self.command_stack
@@ -1430,11 +1392,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
conn = cast(Connection, conn)
try:
- return await asyncio.shield(
- self._try_execute(conn, execute, stack, raise_on_error)
+ return await conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error),
)
- except RuntimeError:
- await self.reset()
finally:
await self.reset()