import os import pytest import redis import time import re from threading import Thread from redis.client import parse_client_list from redis.connection import ssl_available, to_bool from .conftest import skip_if_server_version_lt class DummyConnection(object): description_format = "DummyConnection<>" def __init__(self, **kwargs): self.kwargs = kwargs self.pid = os.getpid() def connect(self): pass def is_ready_for_command(self): return True class TestConnectionPool(object): def get_pool(self, connection_kwargs=None, max_connections=None, connection_class=redis.Connection): connection_kwargs = connection_kwargs or {} pool = redis.ConnectionPool( connection_class=connection_class, max_connections=max_connections, **connection_kwargs) return pool def test_connection_creation(self): connection_kwargs = {'foo': 'bar', 'biz': 'baz'} pool = self.get_pool(connection_kwargs=connection_kwargs, connection_class=DummyConnection) connection = pool.get_connection('_') assert isinstance(connection, DummyConnection) assert connection.kwargs == connection_kwargs def test_multiple_connections(self): pool = self.get_pool() c1 = pool.get_connection('_') c2 = pool.get_connection('_') assert c1 != c2 def test_max_connections(self): pool = self.get_pool(max_connections=2) pool.get_connection('_') pool.get_connection('_') with pytest.raises(redis.ConnectionError): pool.get_connection('_') def test_reuse_previously_released_connection(self): pool = self.get_pool() c1 = pool.get_connection('_') pool.release(c1) c2 = pool.get_connection('_') assert c1 == c2 def test_repr_contains_db_info_tcp(self): connection_kwargs = {'host': 'localhost', 'port': 6379, 'db': 1} pool = self.get_pool(connection_kwargs=connection_kwargs, connection_class=redis.Connection) expected = 'ConnectionPool>' assert repr(pool) == expected def test_repr_contains_db_info_unix(self): connection_kwargs = {'path': '/abc', 'db': 1} pool = self.get_pool(connection_kwargs=connection_kwargs, connection_class=redis.UnixDomainSocketConnection) expected = 'ConnectionPool>' assert repr(pool) == expected def test_pool_provides_healthy_connections(self): pool = self.get_pool(connection_class=redis.Connection, max_connections=2) conn1 = pool.get_connection('_') conn2 = pool.get_connection('_') # set a unique name on the connection we'll be testing conn1._same_connection_value = 'killed-client' conn1.send_command('client', 'setname', 'redis-py-1') assert conn1.read_response() == b'OK' pool.release(conn1) # find the well named client in the client list conn2.send_command('client', 'list') client_list = parse_client_list(conn2.read_response()) for client in client_list: if client['name'] == 'redis-py-1': break else: assert False, 'Client redis-py-1 not found in client list' # kill the well named client conn2.send_command('client', 'kill', client['addr']) assert conn2.read_response() == b'OK' # our connection should have been disconnected, but a quality # connection pool would know this and only provide a healthy # connection. conn = pool.get_connection('_') assert conn == conn1 conn.send_command('ping') assert conn.read_response() == b'PONG' class TestBlockingConnectionPool(object): def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20): connection_kwargs = connection_kwargs or {} pool = redis.BlockingConnectionPool(connection_class=DummyConnection, max_connections=max_connections, timeout=timeout, **connection_kwargs) return pool def test_connection_creation(self): connection_kwargs = {'foo': 'bar', 'biz': 'baz'} pool = self.get_pool(connection_kwargs=connection_kwargs) connection = pool.get_connection('_') assert isinstance(connection, DummyConnection) assert connection.kwargs == connection_kwargs def test_multiple_connections(self): pool = self.get_pool() c1 = pool.get_connection('_') c2 = pool.get_connection('_') assert c1 != c2 def test_connection_pool_blocks_until_timeout(self): "When out of connections, block for timeout seconds, then raise" pool = self.get_pool(max_connections=1, timeout=0.1) pool.get_connection('_') start = time.time() with pytest.raises(redis.ConnectionError): pool.get_connection('_') # we should have waited at least 0.1 seconds assert time.time() - start >= 0.1 def connection_pool_blocks_until_another_connection_released(self): """ When out of connections, block until another connection is released to the pool """ pool = self.get_pool(max_connections=1, timeout=2) c1 = pool.get_connection('_') def target(): time.sleep(0.1) pool.release(c1) Thread(target=target).start() start = time.time() pool.get_connection('_') assert time.time() - start >= 0.1 def test_reuse_previously_released_connection(self): pool = self.get_pool() c1 = pool.get_connection('_') pool.release(c1) c2 = pool.get_connection('_') assert c1 == c2 def test_repr_contains_db_info_tcp(self): pool = redis.ConnectionPool(host='localhost', port=6379, db=0) expected = 'ConnectionPool>' assert repr(pool) == expected def test_repr_contains_db_info_unix(self): pool = redis.ConnectionPool( connection_class=redis.UnixDomainSocketConnection, path='abc', db=0, ) expected = 'ConnectionPool>' assert repr(pool) == expected class TestConnectionPoolURLParsing(object): def test_defaults(self): pool = redis.ConnectionPool.from_url('redis://localhost') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': None, } def test_hostname(self): pool = redis.ConnectionPool.from_url('redis://myhost') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'myhost', 'port': 6379, 'db': 0, 'password': None, } def test_quoted_hostname(self): pool = redis.ConnectionPool.from_url('redis://my %2F host %2B%3D+', decode_components=True) assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'my / host +=+', 'port': 6379, 'db': 0, 'password': None, } def test_port(self): pool = redis.ConnectionPool.from_url('redis://localhost:6380') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6380, 'db': 0, 'password': None, } def test_password(self): pool = redis.ConnectionPool.from_url('redis://:mypassword@localhost') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': 'mypassword', } def test_quoted_password(self): pool = redis.ConnectionPool.from_url( 'redis://:%2Fmypass%2F%2B word%3D%24+@localhost', decode_components=True) assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': '/mypass/+ word=$+', } def test_db_as_argument(self): pool = redis.ConnectionPool.from_url('redis://localhost', db='1') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 1, 'password': None, } def test_db_in_path(self): pool = redis.ConnectionPool.from_url('redis://localhost/2', db='1') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 2, 'password': None, } def test_db_in_querystring(self): pool = redis.ConnectionPool.from_url('redis://localhost/2?db=3', db='1') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 3, 'password': None, } def test_extra_typed_querystring_options(self): pool = redis.ConnectionPool.from_url( 'redis://localhost/2?socket_timeout=20&socket_connect_timeout=10' '&socket_keepalive=&retry_on_timeout=Yes&max_connections=10' ) assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 2, 'socket_timeout': 20.0, 'socket_connect_timeout': 10.0, 'retry_on_timeout': True, 'password': None, } assert pool.max_connections == 10 def test_boolean_parsing(self): for expected, value in ( (None, None), (None, ''), (False, 0), (False, '0'), (False, 'f'), (False, 'F'), (False, 'False'), (False, 'n'), (False, 'N'), (False, 'No'), (True, 1), (True, '1'), (True, 'y'), (True, 'Y'), (True, 'Yes'), ): assert expected is to_bool(value) def test_invalid_extra_typed_querystring_options(self): import warnings with warnings.catch_warnings(record=True) as warning_log: redis.ConnectionPool.from_url( 'redis://localhost/2?socket_timeout=_&' 'socket_connect_timeout=abc' ) # Compare the message values assert [ str(m.message) for m in sorted(warning_log, key=lambda l: str(l.message)) ] == [ 'Invalid value for `socket_connect_timeout` in connection URL.', 'Invalid value for `socket_timeout` in connection URL.', ] def test_extra_querystring_options(self): pool = redis.ConnectionPool.from_url('redis://localhost?a=1&b=2') assert pool.connection_class == redis.Connection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': None, 'a': '1', 'b': '2' } def test_calling_from_subclass_returns_correct_instance(self): pool = redis.BlockingConnectionPool.from_url('redis://localhost') assert isinstance(pool, redis.BlockingConnectionPool) def test_client_creates_connection_pool(self): r = redis.Redis.from_url('redis://myhost') assert r.connection_pool.connection_class == redis.Connection assert r.connection_pool.connection_kwargs == { 'host': 'myhost', 'port': 6379, 'db': 0, 'password': None, } def test_invalid_scheme_raises_error(self): with pytest.raises(ValueError): redis.ConnectionPool.from_url('localhost') class TestConnectionPoolUnixSocketURLParsing(object): def test_defaults(self): pool = redis.ConnectionPool.from_url('unix:///socket') assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 0, 'password': None, } def test_password(self): pool = redis.ConnectionPool.from_url('unix://:mypassword@/socket') assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 0, 'password': 'mypassword', } def test_quoted_password(self): pool = redis.ConnectionPool.from_url( 'unix://:%2Fmypass%2F%2B word%3D%24+@/socket', decode_components=True) assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 0, 'password': '/mypass/+ word=$+', } def test_quoted_path(self): pool = redis.ConnectionPool.from_url( 'unix://:mypassword@/my%2Fpath%2Fto%2F..%2F+_%2B%3D%24ocket', decode_components=True) assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/my/path/to/../+_+=$ocket', 'db': 0, 'password': 'mypassword', } def test_db_as_argument(self): pool = redis.ConnectionPool.from_url('unix:///socket', db=1) assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 1, 'password': None, } def test_db_in_querystring(self): pool = redis.ConnectionPool.from_url('unix:///socket?db=2', db=1) assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 2, 'password': None, } def test_extra_querystring_options(self): pool = redis.ConnectionPool.from_url('unix:///socket?a=1&b=2') assert pool.connection_class == redis.UnixDomainSocketConnection assert pool.connection_kwargs == { 'path': '/socket', 'db': 0, 'password': None, 'a': '1', 'b': '2' } class TestSSLConnectionURLParsing(object): @pytest.mark.skipif(not ssl_available, reason="SSL not installed") def test_defaults(self): pool = redis.ConnectionPool.from_url('rediss://localhost') assert pool.connection_class == redis.SSLConnection assert pool.connection_kwargs == { 'host': 'localhost', 'port': 6379, 'db': 0, 'password': None, } @pytest.mark.skipif(not ssl_available, reason="SSL not installed") def test_cert_reqs_options(self): import ssl class DummyConnectionPool(redis.ConnectionPool): def get_connection(self, *args, **kwargs): return self.make_connection() pool = DummyConnectionPool.from_url( 'rediss://?ssl_cert_reqs=none') assert pool.get_connection('_').cert_reqs == ssl.CERT_NONE pool = DummyConnectionPool.from_url( 'rediss://?ssl_cert_reqs=optional') assert pool.get_connection('_').cert_reqs == ssl.CERT_OPTIONAL pool = DummyConnectionPool.from_url( 'rediss://?ssl_cert_reqs=required') assert pool.get_connection('_').cert_reqs == ssl.CERT_REQUIRED class TestConnection(object): def test_on_connect_error(self): """ An error in Connection.on_connect should disconnect from the server see for details: https://github.com/andymccurdy/redis-py/issues/368 """ # this assumes the Redis server being tested against doesn't have # 9999 databases ;) bad_connection = redis.Redis(db=9999) # an error should be raised on connect with pytest.raises(redis.RedisError): bad_connection.info() pool = bad_connection.connection_pool assert len(pool._available_connections) == 1 assert not pool._available_connections[0]._sock @skip_if_server_version_lt('2.8.8') def test_busy_loading_disconnects_socket(self, r): """ If Redis raises a LOADING error, the connection should be disconnected and a BusyLoadingError raised """ with pytest.raises(redis.BusyLoadingError): r.execute_command('DEBUG', 'ERROR', 'LOADING fake message') pool = r.connection_pool assert len(pool._available_connections) == 1 assert not pool._available_connections[0]._sock @skip_if_server_version_lt('2.8.8') def test_busy_loading_from_pipeline_immediate_command(self, r): """ BusyLoadingErrors should raise from Pipelines that execute a command immediately, like WATCH does. """ pipe = r.pipeline() with pytest.raises(redis.BusyLoadingError): pipe.immediate_execute_command('DEBUG', 'ERROR', 'LOADING fake message') pool = r.connection_pool assert not pipe.connection assert len(pool._available_connections) == 1 assert not pool._available_connections[0]._sock @skip_if_server_version_lt('2.8.8') def test_busy_loading_from_pipeline(self, r): """ BusyLoadingErrors should be raised from a pipeline execution regardless of the raise_on_error flag. """ pipe = r.pipeline() pipe.execute_command('DEBUG', 'ERROR', 'LOADING fake message') with pytest.raises(redis.BusyLoadingError): pipe.execute() pool = r.connection_pool assert not pipe.connection assert len(pool._available_connections) == 1 assert not pool._available_connections[0]._sock @skip_if_server_version_lt('2.8.8') def test_read_only_error(self, r): "READONLY errors get turned in ReadOnlyError exceptions" with pytest.raises(redis.ReadOnlyError): r.execute_command('DEBUG', 'ERROR', 'READONLY blah blah') def test_connect_from_url_tcp(self): connection = redis.Redis.from_url('redis://localhost') pool = connection.connection_pool assert re.match('(.*)<(.*)<(.*)>>', repr(pool)).groups() == ( 'ConnectionPool', 'Connection', 'host=localhost,port=6379,db=0', ) def test_connect_from_url_unix(self): connection = redis.Redis.from_url('unix:///path/to/socket') pool = connection.connection_pool assert re.match('(.*)<(.*)<(.*)>>', repr(pool)).groups() == ( 'ConnectionPool', 'UnixDomainSocketConnection', 'path=/path/to/socket,db=0', ) def test_can_read(self, r): connection = r.connection_pool.get_connection('ping') assert not connection.can_read() connection.send_command('ping') # wait for the server to respond wait_until = time.time() + 2 while time.time() < wait_until: if connection.can_read(): break time.sleep(0.01) assert connection.can_read() assert connection.read_response() == b'PONG' assert not connection.can_read()