1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
import asyncio
import socket
import types
from unittest.mock import patch
import pytest
from redis.asyncio.connection import Connection, UnixDomainSocketConnection
from redis.asyncio.retry import Retry
from redis.backoff import NoBackoff
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
from redis.parsers import _AsyncRESP2Parser
from tests.conftest import skip_if_server_version_lt
from .compat import mock
@pytest.mark.onlynoncluster
async def test_invalid_response(create_redis):
r = await create_redis(single_connection_client=True)
raw = b"x"
parser: "_AsyncRESP2Parser" = r.connection._parser
if not isinstance(parser, _AsyncRESP2Parser):
pytest.skip("PythonParser only")
stream_mock = mock.Mock(parser._stream)
stream_mock.readline.return_value = raw + b"\r\n"
with mock.patch.object(parser, "_stream", stream_mock):
with pytest.raises(InvalidResponse) as cm:
await parser.read_response()
assert str(cm.value) == f"Protocol Error: {raw!r}"
@skip_if_server_version_lt("4.0.0")
@pytest.mark.redismod
@pytest.mark.onlynoncluster
async def test_loading_external_modules(modclient):
def inner():
pass
modclient.load_external_module("myfuncname", inner)
assert getattr(modclient, "myfuncname") == inner
assert isinstance(getattr(modclient, "myfuncname"), types.FunctionType)
# and call it
from redis.commands import RedisModuleCommands
j = RedisModuleCommands.json
modclient.load_external_module("sometestfuncname", j)
# d = {'hello': 'world!'}
# mod = j(modclient)
# mod.set("fookey", ".", d)
# assert mod.get('fookey') == d
async def test_socket_param_regression(r):
"""A regression test for issue #1060"""
conn = UnixDomainSocketConnection()
_ = await conn.disconnect() is True
async def test_can_run_concurrent_commands(r):
if getattr(r, "connection", None) is not None:
# Concurrent commands are only supported on pooled or cluster connections
# since there is no synchronization on a single connection.
pytest.skip("pool only")
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))
async def test_connect_retry_on_timeout_error():
"""Test that the _connect function is retried in case of a timeout"""
conn = Connection(retry_on_timeout=True, retry=Retry(NoBackoff(), 3))
origin_connect = conn._connect
conn._connect = mock.AsyncMock()
async def mock_connect():
# connect only on the last retry
if conn._connect.call_count <= 2:
raise socket.timeout
else:
return await origin_connect()
conn._connect.side_effect = mock_connect
await conn.connect()
assert conn._connect.call_count == 3
async def test_connect_without_retry_on_os_error():
"""Test that the _connect function is not being retried in case of a OSError"""
with patch.object(Connection, "_connect") as _connect:
_connect.side_effect = OSError("")
conn = Connection(retry_on_timeout=True, retry=Retry(NoBackoff(), 2))
with pytest.raises(ConnectionError):
await conn.connect()
assert _connect.call_count == 1
async def test_connect_timeout_error_without_retry():
"""Test that the _connect function is not being retried if retry_on_timeout is
set to False"""
conn = Connection(retry_on_timeout=False)
conn._connect = mock.AsyncMock()
conn._connect.side_effect = socket.timeout
with pytest.raises(TimeoutError) as e:
await conn.connect()
assert conn._connect.call_count == 1
assert str(e.value) == "Timeout connecting to server"
|