summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristján Valur Jónsson <sweskman@gmail.com>2023-03-16 14:23:40 +0000
committerGitHub <noreply@github.com>2023-03-16 16:23:40 +0200
commit1b2f408259405d412d7530291902f9e0c8bd34b3 (patch)
treed6436b372818830362f4ac0a5bed8dca8b5eb5f6
parent7d474f90453c7b90bd06c94e0250b618120a599d (diff)
downloadredis-py-1b2f408259405d412d7530291902f9e0c8bd34b3.tar.gz
Fix behaviour of async PythonParser to match RedisParser as for issue #2349 (#2582)
* Allow data to drain from PythonParser after connection close. * Add Changes
-rw-r--r--CHANGES1
-rw-r--r--redis/asyncio/connection.py24
-rw-r--r--tests/test_asyncio/test_connection.py2
3 files changed, 12 insertions, 15 deletions
diff --git a/CHANGES b/CHANGES
index 3e4eba4..b0744c6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,3 +1,4 @@
+ * Allow data to drain from async PythonParser when reading during a disconnect()
* Use asyncio.timeout() instead of async_timeout.timeout() for python >= 3.11 (#2602)
* Add test and fix async HiredisParser when reading during a disconnect() (#2349)
* Use hiredis-py pack_command if available.
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index 93db37e..057067a 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -146,7 +146,7 @@ ExceptionMappingT = Mapping[str, Union[Type[Exception], Mapping[str, Type[Except
class BaseParser:
"""Plain Python parsing class"""
- __slots__ = "_stream", "_read_size"
+ __slots__ = "_stream", "_read_size", "_connected"
EXCEPTION_CLASSES: ExceptionMappingT = {
"ERR": {
@@ -177,6 +177,7 @@ class BaseParser:
def __init__(self, socket_read_size: int):
self._stream: Optional[asyncio.StreamReader] = None
self._read_size = socket_read_size
+ self._connected = False
def __del__(self):
try:
@@ -213,7 +214,7 @@ class BaseParser:
class PythonParser(BaseParser):
"""Plain Python parsing class"""
- __slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
+ __slots__ = ("encoder", "_buffer", "_pos", "_chunks")
def __init__(self, socket_read_size: int):
super().__init__(socket_read_size)
@@ -231,21 +232,19 @@ class PythonParser(BaseParser):
self._stream = connection._reader
if self._stream is None:
raise RedisError("Buffer is closed.")
-
self.encoder = connection.encoder
+ self._clear()
+ self._connected = True
def on_disconnect(self):
"""Called when the stream disconnects"""
- if self._stream is not None:
- self._stream = None
- self.encoder = None
- self._clear()
+ self._connected = False
async def can_read_destructive(self) -> bool:
+ if not self._connected:
+ raise RedisError("Buffer is closed.")
if self._buffer:
return True
- if self._stream is None:
- raise RedisError("Buffer is closed.")
try:
async with async_timeout(0):
return await self._stream.read(1)
@@ -253,6 +252,8 @@ class PythonParser(BaseParser):
return False
async def read_response(self, disable_decoding: bool = False):
+ if not self._connected:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
if self._chunks:
# augment parsing buffer with previously read data
self._buffer += b"".join(self._chunks)
@@ -266,8 +267,6 @@ class PythonParser(BaseParser):
async def _read_response(
self, disable_decoding: bool = False
) -> Union[EncodableT, ResponseError, None]:
- if not self._stream or not self.encoder:
- raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
raw = await self._readline()
response: Any
byte, response = raw[:1], raw[1:]
@@ -354,14 +353,13 @@ class PythonParser(BaseParser):
class HiredisParser(BaseParser):
"""Parser class for connections using Hiredis"""
- __slots__ = BaseParser.__slots__ + ("_reader", "_connected")
+ __slots__ = ("_reader",)
def __init__(self, socket_read_size: int):
if not HIREDIS_AVAILABLE:
raise RedisError("Hiredis is not available.")
super().__init__(socket_read_size=socket_read_size)
self._reader: Optional[hiredis.Reader] = None
- self._connected: bool = False
def on_connect(self, connection: "Connection"):
self._stream = connection._reader
diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py
index 1851ca9..e2d77fc 100644
--- a/tests/test_asyncio/test_connection.py
+++ b/tests/test_asyncio/test_connection.py
@@ -211,8 +211,6 @@ async def test_connection_disconect_race(parser_class):
This test verifies that a read in progress can finish even
if the `disconnect()` method is called.
"""
- if parser_class == PythonParser:
- pytest.xfail("doesn't work yet with PythonParser")
if parser_class == HiredisParser and not HIREDIS_AVAILABLE:
pytest.skip("Hiredis not available")