diff options
author | dvora-h <67596500+dvora-h@users.noreply.github.com> | 2023-03-23 12:52:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-23 12:52:01 +0200 |
commit | 753018ebc23021ba726253f3962a69a0e363d41f (patch) | |
tree | 26e1442a42b95b7a5b5f2900ef694b68f381be82 /redis/parsers/resp2.py | |
parent | 66a4d6b2a493dd3a20cc299ab5fef3c14baad965 (diff) | |
download | redis-py-753018ebc23021ba726253f3962a69a0e363d41f.tar.gz |
Reorganizing the parsers code, and add support for RESP3 (#2574)
* Reorganizing the parsers code
* fix build package
* fix imports
* fix flake8
* add resp to Connection class
* core commands
* python resp3 parser
* pipeline
* async resp3 parser
* some asymc tests
* resp3 parser for async cluster
* async commands tests
* linters
* linters
* linters
* fix ModuleNotFoundError
* fix tests
* fix assert_resp_response_in
* fix command_getkeys in cluster
* fail-fast false
* version
---------
Co-authored-by: Chayim I. Kirshen <c@kirshen.com>
Diffstat (limited to 'redis/parsers/resp2.py')
-rw-r--r-- | redis/parsers/resp2.py | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/redis/parsers/resp2.py b/redis/parsers/resp2.py new file mode 100644 index 0000000..0acd211 --- /dev/null +++ b/redis/parsers/resp2.py @@ -0,0 +1,131 @@ +from typing import Any, Union + +from ..exceptions import ConnectionError, InvalidResponse, ResponseError +from ..typing import EncodableT +from .base import _AsyncRESPBase, _RESPBase +from .socket import SERVER_CLOSED_CONNECTION_ERROR + + +class _RESP2Parser(_RESPBase): + """RESP2 protocol implementation""" + + def read_response(self, disable_decoding=False): + pos = self._buffer.get_pos() + try: + result = self._read_response(disable_decoding=disable_decoding) + except BaseException: + self._buffer.rewind(pos) + raise + else: + self._buffer.purge() + return result + + def _read_response(self, disable_decoding=False): + raw = self._buffer.readline() + if not raw: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + + byte, response = raw[:1], raw[1:] + + # server returned an error + if byte == b"-": + response = response.decode("utf-8", errors="replace") + error = self.parse_error(response) + # if the error is a ConnectionError, raise immediately so the user + # is notified + if isinstance(error, ConnectionError): + raise error + # otherwise, we're dealing with a ResponseError that might belong + # inside a pipeline response. the connection's read_response() + # and/or the pipeline's execute() will raise this error if + # necessary, so just return the exception instance here. + return error + # single value + elif byte == b"+": + pass + # int value + elif byte == b":": + return int(response) + # bulk response + elif byte == b"$" and response == b"-1": + return None + elif byte == b"$": + response = self._buffer.read(int(response)) + # multi-bulk response + elif byte == b"*" and response == b"-1": + return None + elif byte == b"*": + response = [ + self._read_response(disable_decoding=disable_decoding) + for i in range(int(response)) + ] + else: + raise InvalidResponse(f"Protocol Error: {raw!r}") + + if disable_decoding is False: + response = self.encoder.decode(response) + return response + + +class _AsyncRESP2Parser(_AsyncRESPBase): + """Async class for the RESP2 protocol""" + + async def read_response(self, disable_decoding: bool = False): + if not self._connected: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + if self._chunks: + # augment parsing buffer with previously read data + self._buffer += b"".join(self._chunks) + self._chunks.clear() + self._pos = 0 + response = await self._read_response(disable_decoding=disable_decoding) + # Successfully parsing a response allows us to clear our parsing buffer + self._clear() + return response + + async def _read_response( + self, disable_decoding: bool = False + ) -> Union[EncodableT, ResponseError, None]: + raw = await self._readline() + response: Any + byte, response = raw[:1], raw[1:] + + # server returned an error + if byte == b"-": + response = response.decode("utf-8", errors="replace") + error = self.parse_error(response) + # if the error is a ConnectionError, raise immediately so the user + # is notified + if isinstance(error, ConnectionError): + self._clear() # Successful parse + raise error + # otherwise, we're dealing with a ResponseError that might belong + # inside a pipeline response. the connection's read_response() + # and/or the pipeline's execute() will raise this error if + # necessary, so just return the exception instance here. + return error + # single value + elif byte == b"+": + pass + # int value + elif byte == b":": + return int(response) + # bulk response + elif byte == b"$" and response == b"-1": + return None + elif byte == b"$": + response = await self._read(int(response)) + # multi-bulk response + elif byte == b"*" and response == b"-1": + return None + elif byte == b"*": + response = [ + (await self._read_response(disable_decoding)) + for _ in range(int(response)) # noqa + ] + else: + raise InvalidResponse(f"Protocol Error: {raw!r}") + + if disable_decoding is False: + response = self.encoder.decode(response) + return response |