diff options
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r-- | redis/asyncio/client.py | 99 |
1 files changed, 69 insertions, 30 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 5de2ff9..7986b11 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -500,28 +500,37 @@ class Redis( ): raise error - # COMMAND EXECUTION AND PROTOCOL PARSING - async def execute_command(self, *args, **options): - """Execute a command and return a parsed response""" - await self.initialize() - pool = self.connection_pool - command_name = args[0] - conn = self.connection or await pool.get_connection(command_name, **options) - - if self.single_connection_client: - await self._single_conn_lock.acquire() + async def _try_send_command_parse_response(self, conn, *args, **options): try: return await conn.retry.call_with_retry( lambda: self._send_command_parse_response( - conn, command_name, *args, **options + conn, args[0], *args, **options ), lambda error: self._disconnect_raise(conn, error), ) + except asyncio.CancelledError: + await conn.disconnect(nowait=True) + raise finally: if self.single_connection_client: self._single_conn_lock.release() if not self.connection: - await pool.release(conn) + await self.connection_pool.release(conn) + + # COMMAND EXECUTION AND PROTOCOL PARSING + async def execute_command(self, *args, **options): + """Execute a command and return a parsed response""" + await self.initialize() + pool = self.connection_pool + command_name = args[0] + conn = self.connection or await pool.get_connection(command_name, **options) + + if self.single_connection_client: + await self._single_conn_lock.acquire() + + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -765,10 +774,18 @@ class PubSub: is not a TimeoutError. Otherwise, try to reconnect """ await conn.disconnect() + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() + async def _try_execute(self, conn, command, *arg, **kwargs): + try: + return await command(*arg, **kwargs) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def _execute(self, conn, command, *args, **kwargs): """ Connect manually upon disconnection. If the Redis server is down, @@ -777,9 +794,11 @@ class PubSub: called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await conn.retry.call_with_retry( - lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: self._try_execute(conn, command, *args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), + ) ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -1181,6 +1200,18 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise + async def _try_send_command_parse_response(self, conn, *args, **options): + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, args[0], *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), + ) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1196,13 +1227,13 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] command_name, self.shard_hint ) self.connection = conn - - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), - ) + try: + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) + except asyncio.CancelledError: + await conn.disconnect() + raise def pipeline_execute_command(self, *args, **options): """ @@ -1369,6 +1400,19 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise + async def _try_execute(self, conn, execute, stack, raise_on_error): + try: + return await conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), + ) + except asyncio.CancelledError: + # not supposed to be possible, yet here we are + await conn.disconnect(nowait=True) + raise + finally: + await self.reset() + async def execute(self, raise_on_error: bool = True): """Execute all the commands in the current pipeline""" stack = self.command_stack @@ -1391,15 +1435,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] try: return await asyncio.shield( - conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), - ) + self._try_execute(conn, execute, stack, raise_on_error) ) - except asyncio.CancelledError: - # not supposed to be possible, yet here we are - await conn.disconnect(nowait=True) - raise + except RuntimeError: + await self.reset() finally: await self.reset() |