1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import asyncio
import socket
import types
from unittest.mock import patch
import pytest
import redis
from redis.asyncio.connection import (
BaseParser,
Connection,
PythonParser,
UnixDomainSocketConnection,
)
from redis.asyncio.retry import Retry
from redis.backoff import NoBackoff
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
from tests.conftest import skip_if_server_version_lt
from .compat import mock
from .mocks import MockStream
@pytest.mark.onlynoncluster
async def test_invalid_response(create_redis):
r = await create_redis(single_connection_client=True)
raw = b"x"
fake_stream = MockStream(raw + b"\r\n")
parser: BaseParser = r.connection._parser
with mock.patch.object(parser, "_stream", fake_stream):
with pytest.raises(InvalidResponse) as cm:
await parser.read_response()
if isinstance(parser, PythonParser):
assert str(cm.value) == f"Protocol Error: {raw!r}"
else:
assert (
str(cm.value) == f'Protocol error, got "{raw.decode()}" as reply type byte'
)
await r.connection.disconnect()
@skip_if_server_version_lt("4.0.0")
@pytest.mark.redismod
@pytest.mark.onlynoncluster
async def test_loading_external_modules(modclient):
def inner():
pass
modclient.load_external_module("myfuncname", inner)
assert getattr(modclient, "myfuncname") == inner
assert isinstance(getattr(modclient, "myfuncname"), types.FunctionType)
# and call it
from redis.commands import RedisModuleCommands
j = RedisModuleCommands.json
modclient.load_external_module("sometestfuncname", j)
# d = {'hello': 'world!'}
# mod = j(modclient)
# mod.set("fookey", ".", d)
# assert mod.get('fookey') == d
async def test_socket_param_regression(r):
"""A regression test for issue #1060"""
conn = UnixDomainSocketConnection()
_ = await conn.disconnect() is True
async def test_can_run_concurrent_commands(r):
if getattr(r, "connection", None) is not None:
# Concurrent commands are only supported on pooled or cluster connections
# since there is no synchronization on a single connection.
pytest.skip("pool only")
assert await r.ping() is True
assert all(await asyncio.gather(*(r.ping() for _ in range(10))))
async def test_connect_retry_on_timeout_error():
"""Test that the _connect function is retried in case of a timeout"""
conn = Connection(retry_on_timeout=True, retry=Retry(NoBackoff(), 3))
origin_connect = conn._connect
conn._connect = mock.AsyncMock()
async def mock_connect():
# connect only on the last retry
if conn._connect.call_count <= 2:
raise socket.timeout
else:
return await origin_connect()
conn._connect.side_effect = mock_connect
await conn.connect()
assert conn._connect.call_count == 3
async def test_connect_without_retry_on_os_error():
"""Test that the _connect function is not being retried in case of a OSError"""
with patch.object(Connection, "_connect") as _connect:
_connect.side_effect = OSError("")
conn = Connection(retry_on_timeout=True, retry=Retry(NoBackoff(), 2))
with pytest.raises(ConnectionError):
await conn.connect()
assert _connect.call_count == 1
async def test_connect_timeout_error_without_retry():
"""Test that the _connect function is not being retried if retry_on_timeout is
set to False"""
conn = Connection(retry_on_timeout=False)
conn._connect = mock.AsyncMock()
conn._connect.side_effect = socket.timeout
with pytest.raises(TimeoutError) as e:
await conn.connect()
assert conn._connect.call_count == 1
assert str(e.value) == "Timeout connecting to server"
@pytest.mark.onlynoncluster
async def test_connection_parse_response_resume(r: redis.Redis):
"""
This test verifies that the Connection parser,
be that PythonParser or HiredisParser,
can be interrupted at IO time and then resume parsing.
"""
conn = Connection(**r.connection_pool.connection_kwargs)
await conn.connect()
message = (
b"*3\r\n$7\r\nmessage\r\n$8\r\nchannel1\r\n"
b"$25\r\nhi\r\nthere\r\n+how\r\nare\r\nyou\r\n"
)
conn._parser._stream = MockStream(message, interrupt_every=2)
for i in range(100):
try:
response = await conn.read_response()
break
except MockStream.TestError:
pass
else:
pytest.fail("didn't receive a response")
assert response
assert i > 0
|