summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-06-01 08:38:01 -0700
committerAndy McCurdy <andy@andymccurdy.com>2019-07-09 17:03:36 -0700
commitacac4db0c064d2618d75f27a58384c97c16458fd (patch)
treed06bbe03bb758c3cc443128d6046118146979240
parent9ed2132af28652e964b6a4a20d387580d1081003 (diff)
downloadredis-py-acac4db0c064d2618d75f27a58384c97c16458fd.tar.gz
Use nonblocking sockets instead of selectors for healthy connections
This replaces the work in 3.2.0 to use nonblocking sockets instead of selectors. Selectors proved to be problematic for some environments including eventlet and gevent. Nonblocking sockets should be available in all environments.
-rw-r--r--CHANGES5
-rw-r--r--redis/__init__.py2
-rw-r--r--redis/_compat.py4
-rwxr-xr-xredis/connection.py155
-rw-r--r--redis/selector.py196
-rw-r--r--tests/test_connection_pool.py4
-rw-r--r--tests/test_pubsub.py13
-rw-r--r--tests/test_selector.py122
8 files changed, 128 insertions, 373 deletions
diff --git a/CHANGES b/CHANGES
index 5256faf..e2d06c1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,4 +1,4 @@
-* 3.2.2 (in development)
+* 3.3.0 (in development)
* Resolve a race condition with the PubSubWorkerThread. #1150
* Cleanup socket read error messages. Thanks Vic Yu. #1159
* Cleanup the Connection's selector correctly. Thanks Bruce Merry. #1153
@@ -17,6 +17,9 @@
cause the connection to be disconnected and cleaned up appropriately.
#923
* Add READONLY and READWRITE commands. Thanks @theodesp. #1114
+ * Remove selectors in favor of nonblocking sockets. Selectors had
+ issues in some environments including eventlet and gevent. This should
+ resolve those issues with no other side effects.
* 3.2.1
* Fix SentinelConnectionPool to work in multiprocess/forked environments.
* 3.2.0
diff --git a/redis/__init__.py b/redis/__init__.py
index b74c403..2135fd8 100644
--- a/redis/__init__.py
+++ b/redis/__init__.py
@@ -29,7 +29,7 @@ def int_or_str(value):
return value
-__version__ = '3.2.1'
+__version__ = '3.3.dev2'
VERSION = tuple(map(int_or_str, __version__.split('.')))
__all__ = [
diff --git a/redis/_compat.py b/redis/_compat.py
index bde6fb6..d70af2a 100644
--- a/redis/_compat.py
+++ b/redis/_compat.py
@@ -1,12 +1,12 @@
"""Internal module for Python 2 backwards compatibility."""
import errno
+import socket
import sys
# For Python older than 3.5, retry EINTR.
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and
sys.version_info[1] < 5):
# Adapted from https://bugs.python.org/review/23863/patch/14532/54418
- import socket
import time
# Wrapper for handling interruptable system calls.
@@ -100,6 +100,7 @@ if sys.version_info[0] < 3:
basestring = basestring
unicode = unicode
long = long
+ BlockingIOError = socket.error
else:
from urllib.parse import parse_qs, unquote, urlparse
from string import ascii_letters
@@ -129,6 +130,7 @@ else:
unicode = str
safe_unicode = str
long = int
+ BlockingIOError = BlockingIOError
try: # Python 3
from queue import LifoQueue, Empty, Full
diff --git a/redis/connection.py b/redis/connection.py
index 88286c8..7d4301a 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,5 +1,6 @@
from __future__ import unicode_literals
from distutils.version import StrictVersion
+from errno import EWOULDBLOCK
from itertools import chain
import io
import os
@@ -17,7 +18,7 @@ except ImportError:
from redis._compat import (xrange, imap, byte_to_chr, unicode, long,
nativestr, basestring, iteritems,
LifoQueue, Empty, Full, urlparse, parse_qs,
- recv, recv_into, unquote)
+ recv, recv_into, unquote, BlockingIOError)
from redis.exceptions import (
DataError,
RedisError,
@@ -31,7 +32,6 @@ from redis.exceptions import (
ExecAbortError,
ReadOnlyError
)
-from redis.selector import DefaultSelector
from redis.utils import HIREDIS_AVAILABLE
if HIREDIS_AVAILABLE:
import hiredis
@@ -61,6 +61,8 @@ SYM_EMPTY = b''
SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."
+SENTINEL = object()
+
class Encoder(object):
"Encode strings to bytes and decode bytes to strings"
@@ -126,9 +128,10 @@ class BaseParser(object):
class SocketBuffer(object):
- def __init__(self, socket, socket_read_size):
+ def __init__(self, socket, socket_read_size, socket_timeout):
self._sock = socket
self.socket_read_size = socket_read_size
+ self.socket_timeout = socket_timeout
self._buffer = io.BytesIO()
# number of bytes written to the buffer from the socket
self.bytes_written = 0
@@ -139,25 +142,51 @@ class SocketBuffer(object):
def length(self):
return self.bytes_written - self.bytes_read
- def _read_from_socket(self, length=None):
+ def _read_from_socket(self, length=None, timeout=SENTINEL,
+ raise_on_timeout=True):
+ sock = self._sock
socket_read_size = self.socket_read_size
buf = self._buffer
buf.seek(self.bytes_written)
marker = 0
+ custom_timeout = timeout is not SENTINEL
- while True:
- data = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if isinstance(data, bytes) and len(data) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- buf.write(data)
- data_length = len(data)
- self.bytes_written += data_length
- marker += data_length
-
- if length is not None and length > marker:
- continue
- break
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ while True:
+ data = recv(self._sock, socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if isinstance(data, bytes) and len(data) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ buf.write(data)
+ data_length = len(data)
+ self.bytes_written += data_length
+ marker += data_length
+
+ if length is not None and length > marker:
+ continue
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self.socket_timeout)
+
+ def can_read(self, timeout):
+ return bool(self.length) or \
+ self._read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
def read(self, length):
length = length + 2 # make sure to read the \r\n terminator
@@ -233,7 +262,9 @@ class PythonParser(BaseParser):
def on_connect(self, connection):
"Called when the socket connects"
self._sock = connection._sock
- self._buffer = SocketBuffer(self._sock, self.socket_read_size)
+ self._buffer = SocketBuffer(self._sock,
+ self.socket_read_size,
+ connection.socket_timeout)
self.encoder = connection.encoder
def on_disconnect(self):
@@ -244,8 +275,8 @@ class PythonParser(BaseParser):
self._buffer = None
self.encoder = None
- def can_read(self):
- return self._buffer and bool(self._buffer.length)
+ def can_read(self, timeout):
+ return self._buffer and self._buffer.can_read(timeout)
def read_response(self):
response = self._buffer.readline()
@@ -312,6 +343,7 @@ class HiredisParser(BaseParser):
def on_connect(self, connection):
self._sock = connection._sock
+ self._socket_timeout = connection.socket_timeout
kwargs = {
'protocolError': InvalidResponse,
'replyError': self.parse_error,
@@ -333,13 +365,52 @@ class HiredisParser(BaseParser):
self._reader = None
self._next_response = False
- def can_read(self):
+ def can_read(self, timeout):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
if self._next_response is False:
self._next_response = self._reader.gets()
- return self._next_response is not False
+ if self._next_response is False:
+ return self.read_from_socket(timeout=timeout,
+ raise_on_timeout=False)
+ return True
+
+ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
+ sock = self._sock
+ custom_timeout = timeout is not SENTINEL
+ try:
+ if custom_timeout:
+ sock.settimeout(timeout)
+ if HIREDIS_USE_BYTE_BUFFER:
+ bufflen = recv_into(self._sock, self._buffer)
+ if bufflen == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(self._buffer, 0, bufflen)
+ else:
+ buffer = recv(self._sock, self.socket_read_size)
+ # an empty string indicates the server shutdown the socket
+ if not isinstance(buffer, bytes) or len(buffer) == 0:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ self._reader.feed(buffer)
+ # data was read from the socket and added to the buffer.
+ # return True to indicate that data was read.
+ return True
+ except BlockingIOError as ex:
+ # if we're in nonblocking mode and the recv raises a
+ # blocking error, simply return False indicating that
+ # there's no data to be read. otherwise raise the
+ # original exception.
+ if raise_on_timeout or ex.errno != EWOULDBLOCK:
+ raise
+ return False
+ except socket.timeout:
+ if not raise_on_timeout:
+ raise
+ return False
+ finally:
+ if custom_timeout:
+ sock.settimeout(self._socket_timeout)
def read_response(self):
if not self._reader:
@@ -352,21 +423,8 @@ class HiredisParser(BaseParser):
return response
response = self._reader.gets()
- socket_read_size = self.socket_read_size
while response is False:
- if HIREDIS_USE_BYTE_BUFFER:
- bufflen = recv_into(self._sock, self._buffer)
- if bufflen == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- else:
- buffer = recv(self._sock, socket_read_size)
- # an empty string indicates the server shutdown the socket
- if not isinstance(buffer, bytes) or len(buffer) == 0:
- raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
- if HIREDIS_USE_BYTE_BUFFER:
- self._reader.feed(self._buffer, 0, bufflen)
- else:
- self._reader.feed(buffer)
+ self.read_from_socket()
response = self._reader.gets()
# if an older version of hiredis is installed, we need to attempt
# to convert ResponseErrors to their appropriate types.
@@ -416,7 +474,6 @@ class Connection(object):
self.retry_on_timeout = retry_on_timeout
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
- self._selector = None
self._parser = parser_class(socket_read_size=socket_read_size)
self._description_args = {
'host': self.host,
@@ -454,7 +511,6 @@ class Connection(object):
raise ConnectionError(self._error_message(e))
self._sock = sock
- self._selector = DefaultSelector(sock)
try:
self.on_connect()
except RedisError:
@@ -538,9 +594,6 @@ class Connection(object):
self._parser.on_disconnect()
if self._sock is None:
return
- if self._selector is not None:
- self._selector.close()
- self._selector = None
try:
if os.getpid() == self.pid:
self._sock.shutdown(socket.SHUT_RDWR)
@@ -585,11 +638,7 @@ class Connection(object):
if not sock:
self.connect()
sock = self._sock
- return self._parser.can_read() or self._selector.can_read(timeout)
-
- def is_ready_for_command(self):
- "Check if the connection is ready for a command"
- return self._selector.is_ready_for_command()
+ return self._parser.can_read(timeout)
def read_response(self):
"Read the response from a previously sent command"
@@ -963,10 +1012,13 @@ class ConnectionPool(object):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
@@ -1111,10 +1163,13 @@ class BlockingConnectionPool(ConnectionPool):
# a command. if not, the connection was either returned to the
# pool before all data has been read or the socket has been
# closed. either way, reconnect and verify everything is good.
- if not connection.is_ready_for_command():
+ try:
+ if connection.can_read():
+ raise ConnectionError('Connection has data')
+ except ConnectionError:
connection.disconnect()
connection.connect()
- if not connection.is_ready_for_command():
+ if connection.can_read():
raise ConnectionError('Connection not ready')
except: # noqa: E722
# release the connection back to the pool so that we don't leak it
diff --git a/redis/selector.py b/redis/selector.py
deleted file mode 100644
index bce84a5..0000000
--- a/redis/selector.py
+++ /dev/null
@@ -1,196 +0,0 @@
-import errno
-import select
-from redis.exceptions import RedisError
-
-
-_DEFAULT_SELECTOR = None
-
-
-class BaseSelector(object):
- """
- Base class for all Selectors
- """
- def __init__(self, sock):
- self.sock = sock
-
- def can_read(self, timeout=0):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just that
- there is data to read.
-
- Automatically retries EINTR errors based on PEP 475.
- """
- while True:
- try:
- return self.check_can_read(timeout)
- except (select.error, IOError) as ex:
- if self.errno_from_exception(ex) == errno.EINTR:
- continue
- return False
-
- def is_ready_for_command(self, timeout=0):
- """
- Return True if the socket is ready to send a command,
- otherwise False.
-
- Automatically retries EINTR errors based on PEP 475.
- """
- while True:
- try:
- return self.check_is_ready_for_command(timeout)
- except (select.error, IOError) as ex:
- if self.errno_from_exception(ex) == errno.EINTR:
- continue
- return False
-
- def check_can_read(self, timeout):
- """
- Perform the can_read check. Subclasses should implement this.
- """
- raise NotImplementedError
-
- def check_is_ready_for_command(self, timeout):
- """
- Perform the is_ready_for_command check. Subclasses should
- implement this.
- """
- raise NotImplementedError
-
- def close(self):
- """
- Close the selector.
- """
- self.sock = None
-
- def errno_from_exception(self, ex):
- """
- Get the error number from an exception
- """
- if hasattr(ex, 'errno'):
- return ex.errno
- elif ex.args:
- return ex.args[0]
- else:
- return None
-
-
-if hasattr(select, 'select'):
- class SelectSelector(BaseSelector):
- """
- A select-based selector that should work on most platforms.
-
- This is the worst poll strategy and should only be used if no other
- option is available.
- """
- def check_can_read(self, timeout):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just
- that there is data to read.
- """
- return bool(select.select([self.sock], [], [], timeout)[0])
-
- def check_is_ready_for_command(self, timeout):
- """
- Return True if the socket is ready to send a command,
- otherwise False.
- """
- r, w, e = select.select([self.sock], [self.sock], [self.sock],
- timeout)
- return bool(w and not r and not e)
-
-
-if hasattr(select, 'poll'):
- class PollSelector(BaseSelector):
- """
- A poll-based selector that should work on (almost?) all versions
- of Unix
- """
- READ_MASK = select.POLLIN | select.POLLPRI
- ERROR_MASK = select.POLLERR | select.POLLHUP
- WRITE_MASK = select.POLLOUT
-
- _READ_POLLER_MASK = READ_MASK | ERROR_MASK
- _READY_POLLER_MASK = READ_MASK | ERROR_MASK | WRITE_MASK
-
- def __init__(self, sock):
- super(PollSelector, self).__init__(sock)
- self.read_poller = select.poll()
- self.read_poller.register(sock, self._READ_POLLER_MASK)
- self.ready_poller = select.poll()
- self.ready_poller.register(sock, self._READY_POLLER_MASK)
-
- def close(self):
- """
- Close the selector.
- """
- for poller in (self.read_poller, self.ready_poller):
- try:
- poller.unregister(self.sock)
- except (KeyError, ValueError):
- # KeyError is raised if somehow the socket was not
- # registered
- # ValueError is raised if the socket's file descriptor is
- # negative.
- # In either case, we can't do anything better than to
- # remove the reference to the poller.
- pass
- self.read_poller = None
- self.ready_poller = None
- self.sock = None
-
- def check_can_read(self, timeout=0):
- """
- Return True if data is ready to be read from the socket,
- otherwise False.
-
- This doesn't guarentee that the socket is still connected, just
- that there is data to read.
- """
- timeout = int(timeout * 1000)
- events = self.read_poller.poll(timeout)
- return bool(events and events[0][1] & self.READ_MASK)
-
- def check_is_ready_for_command(self, timeout=0):
- """
- Return True if the socket is ready to send a command,
- otherwise False
- """
- timeout = timeout * 1000
- events = self.ready_poller.poll(timeout)
- return bool(events and events[0][1] == self.WRITE_MASK)
-
-
-def has_selector(selector):
- "Determine if the current platform has the selector available"
- try:
- if selector == 'poll':
- # the select module offers the poll selector even if the platform
- # doesn't support it. Attempt to poll for nothing to make sure
- # poll is available
- p = select.poll()
- p.poll(0)
- else:
- # the other selectors will fail when instantiated
- getattr(select, selector)().close()
- return True
- except (OSError, AttributeError):
- return False
-
-
-def DefaultSelector(sock):
- "Return the best selector for the platform"
- global _DEFAULT_SELECTOR
- if _DEFAULT_SELECTOR is None:
- if has_selector('poll'):
- _DEFAULT_SELECTOR = PollSelector
- elif hasattr(select, 'select'):
- _DEFAULT_SELECTOR = SelectSelector
- else:
- raise RedisError('Platform does not support any selectors')
- return _DEFAULT_SELECTOR(sock)
diff --git a/tests/test_connection_pool.py b/tests/test_connection_pool.py
index f258411..2aea1e4 100644
--- a/tests/test_connection_pool.py
+++ b/tests/test_connection_pool.py
@@ -19,8 +19,8 @@ class DummyConnection(object):
def connect(self):
pass
- def is_ready_for_command(self):
- return True
+ def can_read(self):
+ return False
class TestConnectionPool(object):
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index fc91abf..7f94b4a 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -490,3 +490,16 @@ class TestPubSubPings(object):
assert wait_for_message(p) == make_message(type='pong', channel=None,
data='hello world',
pattern=None)
+
+
+class TestPubSubConnectionKilled(object):
+
+ @skip_if_server_version_lt('3.0.0')
+ def test_connection_error_raised_when_connection_dies(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe('foo')
+ for client in r.client_list():
+ if client['cmd'] == 'subscribe':
+ r.client_kill_filter(_id=client['id'])
+ with pytest.raises(ConnectionError):
+ wait_for_message(p)
diff --git a/tests/test_selector.py b/tests/test_selector.py
deleted file mode 100644
index 07bd6dc..0000000
--- a/tests/test_selector.py
+++ /dev/null
@@ -1,122 +0,0 @@
-import pytest
-import time
-from redis import selector
-
-_SELECTORS = (
- 'SelectSelector',
- 'PollSelector',
-)
-
-
-@pytest.mark.parametrize('selector_name', _SELECTORS)
-class TestSelector(object):
-
- @pytest.fixture()
- def selector_patch(self, selector_name, request):
- "A fixture to patch the DefaultSelector with each selector"
- if not hasattr(selector, selector_name):
- pytest.skip('selector %s unavailable' % selector_name)
- default_selector = selector._DEFAULT_SELECTOR
-
- def revert_selector():
- selector._DEFAULT_SELECTOR = default_selector
- request.addfinalizer(revert_selector)
-
- selector._DEFAULT_SELECTOR = getattr(selector, selector_name)
-
- def kill_connection(self, connection, r):
- "Helper that tells the redis server to kill `connection`"
- # set a name for the connection so that we can identify and kill it
- connection.send_command('client', 'setname', 'redis-py-1')
- assert connection.read_response() == b'OK'
-
- # find the client based on its name and kill it
- for client in r.client_list():
- if client['name'] == 'redis-py-1':
- assert r.client_kill(client['addr'])
- break
- else:
- assert False, 'Client redis-py-1 not found in client list'
-
- def test_can_read(self, selector_patch, r):
- c = r.connection_pool.get_connection('_')
-
- # a fresh connection should not be readable
- assert not c.can_read()
-
- c.send_command('PING')
- # a connection should be readable when a response is available
- # note that we supply a timeout here to make sure the server has
- # a chance to respond
- assert c.can_read(1.0)
-
- assert c.read_response() == b'PONG'
-
- # once the response is read, the connection is no longer readable
- assert not c.can_read()
-
- def test_is_ready_for_command(self, selector_patch, r):
- c = r.connection_pool.get_connection('_')
-
- # a fresh connection should be ready for a new command
- assert c.is_ready_for_command()
-
- c.send_command('PING')
- # once the server replies with a response, the selector should report
- # that the connection is no longer ready since there is data that
- # can be read. note that we need to wait for the server to respond
- wait_until = time.time() + 2
- while time.time() < wait_until:
- if not c.is_ready_for_command():
- break
- time.sleep(0.01)
-
- assert not c.is_ready_for_command()
-
- assert c.read_response() == b'PONG'
-
- # once the response is read, the connection should be ready again
- assert c.is_ready_for_command()
-
- def test_killed_connection_no_longer_ready(self, selector_patch, r):
- "A connection that becomes disconnected is no longer ready"
- c = r.connection_pool.get_connection('_')
- # the connection should start as ready
- assert c.is_ready_for_command()
-
- self.kill_connection(c, r)
-
- # the selector should immediately report that the socket is no
- # longer ready
- assert not c.is_ready_for_command()
-
- def test_pool_restores_killed_connection(self, selector_patch, r2):
- """
- The ConnectionPool only returns healthy connecdtions, even if the
- connection was killed while idle in the pool.
- """
- # r2 provides two separate clients/connection pools
- r = r2[0]
- c = r.connection_pool.get_connection('_')
- c._test_client = True
- # the connection should start as ready
- assert c.is_ready_for_command()
-
- # release the connection back to the pool
- r.connection_pool.release(c)
-
- # kill the connection that is now idle in the pool
- # use the second redis client/pool instance run the kill command
- # such that it doesn't manipulate the primary connection pool
- self.kill_connection(c, r2[1])
-
- assert not c.is_ready_for_command()
-
- # retrieving the connection from the pool should provide us with
- # the same connection we were previously using and it should now
- # be ready for a command
- c2 = r.connection_pool.get_connection('_')
- assert c2 == c
- assert c2._test_client is True
-
- assert c.is_ready_for_command()