From 0647659c2f3550063c404a616ba37987c9158149 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Tue, 27 May 2014 13:03:45 -0700 Subject: Don't retry commands that fail due to a socket.timeout by default. Users now have the ability about how socket.timeout errors are handled. Previously socket.timeout errors were handled just like any other socket error in that the command would be retried once. This createed a potential race condition when the client sends a command to a busy Redis server that can't reply faster than the client's `socket_timeout` option. In this case, the server will still eventually process the command. There's now a `retry_on_timeout` option that's set to False by default. If `retry_on_timeout` is False, any socket.timeout error will raise a TimeoutError exception. If `retry_on_timeout` is set to True, the client will retry executing the command once just like other socket.error exceptions. TODO: Write better tests for this code. TODO: Much of this logic could/should be moved to the ConnectionPool or Connection objects. Fixes #261 --- redis/__init__.py | 15 ++++++++------- redis/client.py | 24 +++++++++++++++++------- redis/connection.py | 17 ++++++++++++++--- redis/exceptions.py | 6 +++--- 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/redis/__init__.py b/redis/__init__.py index 2da66e2..cdfa1f2 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -9,15 +9,16 @@ from redis.connection import ( from redis.utils import from_url from redis.exceptions import ( AuthenticationError, - ConnectionError, BusyLoadingError, + ConnectionError, DataError, InvalidResponse, PubSubError, + ReadOnlyError, RedisError, ResponseError, - WatchError, - ReadOnlyError + TimeoutError, + WatchError ) @@ -26,8 +27,8 @@ VERSION = tuple(map(int, __version__.split('.'))) __all__ = [ 'Redis', 'StrictRedis', 'ConnectionPool', 'BlockingConnectionPool', - 'Connection', 'SSLConnection', 'UnixDomainSocketConnection', - 'RedisError', 'ConnectionError', 'ResponseError', 'AuthenticationError', - 'InvalidResponse', 'DataError', 'PubSubError', 'WatchError', 'from_url', - 'BusyLoadingError', 'ReadOnlyError' + 'Connection', 'SSLConnection', 'UnixDomainSocketConnection', 'from_url', + 'AuthenticationError', 'BusyLoadingError', 'ConnectionError', 'DataError', + 'InvalidResponse', 'PubSubError', 'ReadOnlyError', 'RedisError', + 'ResponseError', 'TimeoutError', 'WatchError' ] diff --git a/redis/client.py b/redis/client.py index b0f8c46..511df4d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -17,6 +17,7 @@ from redis.exceptions import ( PubSubError, RedisError, ResponseError, + TimeoutError, WatchError, ) @@ -390,8 +391,8 @@ class StrictRedis(object): db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, - connection_pool=None, charset='utf-8', - errors='strict', decode_responses=False, + connection_pool=None, charset='utf-8', errors='strict', + decode_responses=False, retry_on_timeout=False, unix_socket_path=None, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): @@ -403,6 +404,7 @@ class StrictRedis(object): 'encoding': charset, 'encoding_errors': errors, 'decode_responses': decode_responses, + 'retry_on_timeout': retry_on_timeout } # based on input, setup appropriate connection args if unix_socket_path is not None: @@ -504,8 +506,10 @@ class StrictRedis(object): try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) - except ConnectionError: + except (ConnectionError, TimeoutError) as e: connection.disconnect() + if not connection.retry_on_timeout and isinstance(e, TimeoutError): + raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: @@ -2049,8 +2053,10 @@ class PubSub(object): def _execute(self, connection, command, *args): try: return command(*args) - except ConnectionError: + except (ConnectionError, TimeoutError) as e: connection.disconnect() + if not connection.retry_on_timeout and isinstance(e, TimeoutError): + raise # Connect manually here. If the Redis server is down, this will # fail and raise a ConnectionError as desired. connection.connect() @@ -2328,8 +2334,10 @@ class BasePipeline(object): try: conn.send_command(*args) return self.parse_response(conn, command_name, **options) - except ConnectionError: + except (ConnectionError, TimeoutError) as e: conn.disconnect() + if not conn.retry_on_timeout and isinstance(e, TimeoutError): + raise # if we're not already watching, we can safely retry the command try: if not self.watching: @@ -2495,12 +2503,14 @@ class BasePipeline(object): try: return execute(conn, stack, raise_on_error) - except ConnectionError: + except (ConnectionError, TimeoutError) as e: conn.disconnect() + if not conn.retry_on_timeout and isinstance(e, TimeoutError): + raise # if we were watching a variable, the watch is no longer valid # since this connection has died. raise a WatchError, which # indicates the user should retry his transaction. If this is more - # than a temporary failure, the WATCH that the user next issue + # than a temporary failure, the WATCH that the user next issues # will fail, propegating the real ConnectionError if self.watching: raise WatchError("A ConnectionError occured on while watching " diff --git a/redis/connection.py b/redis/connection.py index a7a2b69..ff8a022 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -20,6 +20,7 @@ from redis._compat import (b, xrange, imap, byte_to_chr, unicode, bytes, long, from redis.exceptions import ( RedisError, ConnectionError, + TimeoutError, BusyLoadingError, ResponseError, InvalidResponse, @@ -117,7 +118,9 @@ class SocketBuffer(object): if length is not None and length > marker: continue break - except (socket.error, socket.timeout): + except socket.timeout: + raise TimeoutError("Timeout reading from socket") + except socket.error: e = sys.exc_info()[1] raise ConnectionError("Error while reading from socket: %s" % (e.args,)) @@ -313,7 +316,9 @@ class HiredisParser(BaseParser): # an empty string indicates the server shutdown the socket if isinstance(buffer, str) and len(buffer) == 0: raise socket.error("Connection closed by remote server.") - except (socket.error, socket.timeout): + except socket.timeout: + raise TimeoutError("Timeout reading from socket") + except socket.error: e = sys.exc_info()[1] raise ConnectionError("Error while reading from socket: %s" % (e.args,)) @@ -356,7 +361,7 @@ class Connection(object): def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, - encoding='utf-8', + retry_on_timeout=False, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser, socket_read_size=65536): self.pid = os.getpid() @@ -368,6 +373,7 @@ class Connection(object): self.socket_connect_timeout = socket_connect_timeout or socket_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options or {} + self.retry_on_timeout = retry_on_timeout self.encoding = encoding self.encoding_errors = encoding_errors self.decode_responses = decode_responses @@ -505,6 +511,9 @@ class Connection(object): command = [command] for item in command: self._sock.sendall(item) + except socket.timeout: + self.disconnect() + raise TimeoutError("Timeout writing to socket") except socket.error: e = sys.exc_info()[1] self.disconnect() @@ -633,12 +642,14 @@ class UnixDomainSocketConnection(Connection): def __init__(self, path='', db=0, password=None, socket_timeout=None, encoding='utf-8', encoding_errors='strict', decode_responses=False, + retry_on_timeout=False, parser_class=DefaultParser, socket_read_size=65536): self.pid = os.getpid() self.path = path self.db = db self.password = password self.socket_timeout = socket_timeout + self.retry_on_timeout = retry_on_timeout self.encoding = encoding self.encoding_errors = encoding_errors self.decode_responses = decode_responses diff --git a/redis/exceptions.py b/redis/exceptions.py index 963e2d0..b8b81dc 100644 --- a/redis/exceptions.py +++ b/redis/exceptions.py @@ -20,11 +20,11 @@ class AuthenticationError(RedisError): pass -class ServerError(RedisError): +class ConnectionError(RedisError): pass -class ConnectionError(ServerError): +class TimeoutError(RedisError): pass @@ -32,7 +32,7 @@ class BusyLoadingError(ConnectionError): pass -class InvalidResponse(ServerError): +class InvalidResponse(RedisError): pass -- cgit v1.2.1