summaryrefslogtreecommitdiff
path: root/tests/test_asyncio/test_connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_asyncio/test_connection.py')
-rw-r--r--tests/test_asyncio/test_connection.py82
1 files changed, 82 insertions, 0 deletions
diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py
index 8e4fdac..1851ca9 100644
--- a/tests/test_asyncio/test_connection.py
+++ b/tests/test_asyncio/test_connection.py
@@ -10,12 +10,14 @@ from redis.asyncio import Redis
from redis.asyncio.connection import (
BaseParser,
Connection,
+ HiredisParser,
PythonParser,
UnixDomainSocketConnection,
)
from redis.asyncio.retry import Retry
from redis.backoff import NoBackoff
from redis.exceptions import ConnectionError, InvalidResponse, TimeoutError
+from redis.utils import HIREDIS_AVAILABLE
from tests.conftest import skip_if_server_version_lt
from .compat import mock
@@ -191,3 +193,83 @@ async def test_connection_parse_response_resume(r: redis.Redis):
pytest.fail("didn't receive a response")
assert response
assert i > 0
+
+
+@pytest.mark.onlynoncluster
+@pytest.mark.parametrize(
+ "parser_class", [PythonParser, HiredisParser], ids=["PythonParser", "HiredisParser"]
+)
+async def test_connection_disconect_race(parser_class):
+ """
+ This test reproduces the case in issue #2349
+ where a connection is closed while the parser is reading to feed the
+ internal buffer.The stream `read()` will succeed, but when it returns,
+ another task has already called `disconnect()` and is waiting for
+ close to finish. When we attempts to feed the buffer, we will fail
+ since the buffer is no longer there.
+
+ This test verifies that a read in progress can finish even
+ if the `disconnect()` method is called.
+ """
+ if parser_class == PythonParser:
+ pytest.xfail("doesn't work yet with PythonParser")
+ if parser_class == HiredisParser and not HIREDIS_AVAILABLE:
+ pytest.skip("Hiredis not available")
+
+ args = {}
+ args["parser_class"] = parser_class
+
+ conn = Connection(**args)
+
+ cond = asyncio.Condition()
+ # 0 == initial
+ # 1 == reader is reading
+ # 2 == closer has closed and is waiting for close to finish
+ state = 0
+
+ # Mock read function, which wait for a close to happen before returning
+ # Can either be invoked as two `read()` calls (HiredisParser)
+ # or as a `readline()` followed by `readexact()` (PythonParser)
+ chunks = [b"$13\r\n", b"Hello, World!\r\n"]
+
+ async def read(_=None):
+ nonlocal state
+ async with cond:
+ if state == 0:
+ state = 1 # we are reading
+ cond.notify()
+ # wait until the closing task has done
+ await cond.wait_for(lambda: state == 2)
+ return chunks.pop(0)
+
+ # function closes the connection while reader is still blocked reading
+ async def do_close():
+ nonlocal state
+ async with cond:
+ await cond.wait_for(lambda: state == 1)
+ state = 2
+ cond.notify()
+ await conn.disconnect()
+
+ async def do_read():
+ return await conn.read_response()
+
+ reader = mock.AsyncMock()
+ writer = mock.AsyncMock()
+ writer.transport = mock.Mock()
+ writer.transport.get_extra_info.side_effect = None
+
+ # for HiredisParser
+ reader.read.side_effect = read
+ # for PythonParser
+ reader.readline.side_effect = read
+ reader.readexactly.side_effect = read
+
+ async def open_connection(*args, **kwargs):
+ return reader, writer
+
+ with patch.object(asyncio, "open_connection", open_connection):
+ await conn.connect()
+
+ vals = await asyncio.gather(do_read(), do_close())
+ assert vals == [b"Hello, World!", None]