summaryrefslogtreecommitdiff
path: root/redis/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-xredis/connection.py58
1 files changed, 42 insertions, 16 deletions
diff --git a/redis/connection.py b/redis/connection.py
index b810fc5..126ea5d 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -232,12 +232,6 @@ class SocketBuffer:
self._buffer.seek(self.bytes_read)
data = self._buffer.read(length)
self.bytes_read += len(data)
-
- # purge the buffer when we've consumed it all so it doesn't
- # grow forever
- if self.bytes_read == self.bytes_written:
- self.purge()
-
return data[:-2]
def readline(self):
@@ -251,23 +245,44 @@ class SocketBuffer:
data = buf.readline()
self.bytes_read += len(data)
+ return data[:-2]
- # purge the buffer when we've consumed it all so it doesn't
- # grow forever
- if self.bytes_read == self.bytes_written:
- self.purge()
+ def get_pos(self):
+ """
+ Get current read position
+ """
+ return self.bytes_read
- return data[:-2]
+ def rewind(self, pos):
+ """
+ Rewind the buffer to a specific position, to re-start reading
+ """
+ self.bytes_read = pos
def purge(self):
- self._buffer.seek(0)
- self._buffer.truncate()
- self.bytes_written = 0
+ """
+ After a successful read, purge the read part of buffer
+ """
+ unread = self.bytes_written - self.bytes_read
+
+ # Only if we have read all of the buffer do we truncate, to
+ # reduce the amount of memory thrashing. This heuristic
+ # can be changed or removed later.
+ if unread > 0:
+ return
+
+ if unread > 0:
+ # move unread data to the front
+ view = self._buffer.getbuffer()
+ view[:unread] = view[-unread:]
+ self._buffer.truncate(unread)
+ self.bytes_written = unread
self.bytes_read = 0
+ self._buffer.seek(0)
def close(self):
try:
- self.purge()
+ self.bytes_written = self.bytes_read = 0
self._buffer.close()
except Exception:
# issue #633 suggests the purge/close somehow raised a
@@ -315,6 +330,17 @@ class PythonParser(BaseParser):
return self._buffer and self._buffer.can_read(timeout)
def read_response(self, disable_decoding=False):
+ pos = self._buffer.get_pos()
+ try:
+ result = self._read_response(disable_decoding=disable_decoding)
+ except BaseException:
+ self._buffer.rewind(pos)
+ raise
+ else:
+ self._buffer.purge()
+ return result
+
+ def _read_response(self, disable_decoding=False):
raw = self._buffer.readline()
if not raw:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -355,7 +381,7 @@ class PythonParser(BaseParser):
if length == -1:
return None
response = [
- self.read_response(disable_decoding=disable_decoding)
+ self._read_response(disable_decoding=disable_decoding)
for i in range(length)
]
if isinstance(response, bytes) and disable_decoding is False: