diff options
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r-- | redis/asyncio/client.py | 93 |
1 files changed, 27 insertions, 66 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 5fb94b3..a7b888e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -500,23 +500,6 @@ class Redis( ): raise error - async def _try_send_command_parse_response(self, conn, *args, **options): - try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, args[0], *args, **options - ), - lambda error: self._disconnect_raise(conn, error), - ) - except asyncio.CancelledError: - await conn.disconnect(nowait=True) - raise - finally: - if self.single_connection_client: - self._single_conn_lock.release() - if not self.connection: - await self.connection_pool.release(conn) - # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): """Execute a command and return a parsed response""" @@ -527,10 +510,18 @@ class Redis( if self.single_connection_client: await self._single_conn_lock.acquire() - - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) - ) + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_raise(conn, error), + ) + finally: + if self.single_connection_client: + self._single_conn_lock.release() + if not self.connection: + await pool.release(conn) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -774,18 +765,10 @@ class PubSub: is not a TimeoutError. Otherwise, try to reconnect """ await conn.disconnect() - if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() - async def _try_execute(self, conn, command, *arg, **kwargs): - try: - return await command(*arg, **kwargs) - except asyncio.CancelledError: - await conn.disconnect() - raise - async def _execute(self, conn, command, *args, **kwargs): """ Connect manually upon disconnection. If the Redis server is down, @@ -794,11 +777,9 @@ class PubSub: called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await asyncio.shield( - conn.retry.call_with_retry( - lambda: self._try_execute(conn, command, *args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), - ) + return await conn.retry.call_with_retry( + lambda: command(*args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -816,7 +797,9 @@ class PubSub: await conn.connect() read_timeout = None if block else timeout - response = await self._execute(conn, conn.read_response, timeout=read_timeout) + response = await self._execute( + conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False + ) if conn.health_check_interval and response == self.health_check_response: # ignore the health check message as user might not expect it @@ -1200,18 +1183,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise - async def _try_send_command_parse_response(self, conn, *args, **options): - try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, args[0], *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), - ) - except asyncio.CancelledError: - await conn.disconnect() - raise - async def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1227,8 +1198,12 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] command_name, self.shard_hint ) self.connection = conn - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) + + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), ) def pipeline_execute_command(self, *args, **options): @@ -1396,19 +1371,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise - async def _try_execute(self, conn, execute, stack, raise_on_error): - try: - return await conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), - ) - except asyncio.CancelledError: - # not supposed to be possible, yet here we are - await conn.disconnect(nowait=True) - raise - finally: - await self.reset() - async def execute(self, raise_on_error: bool = True): """Execute all the commands in the current pipeline""" stack = self.command_stack @@ -1430,11 +1392,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] conn = cast(Connection, conn) try: - return await asyncio.shield( - self._try_execute(conn, execute, stack, raise_on_error) + return await conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), ) - except RuntimeError: - await self.reset() finally: await self.reset() |