diff options
author | Tommie Gannert <tommie@spotify.com> | 2013-11-08 17:22:24 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-11-11 15:23:07 +0000 |
commit | 79fd8dbd153200f2a382147d160ad2bde5009eef (patch) | |
tree | 6066c9fdc34593819aca69c21de8f9cfd1003110 | |
parent | a6cdb3b80bd6e6488c80492dcd8d63910270b645 (diff) | |
download | py-amqp-79fd8dbd153200f2a382147d160ad2bde5009eef.tar.gz |
Save data read in transport before raising exception.
If the exception raised is due to a timeout, we might read from the transport
again. In the old code, the data was discarded, causing loss of
synchronization. This ensures the data we see is preserved for next time.
Conflicts:
amqp/transport.py
-rw-r--r-- | amqp/transport.py | 53 |
1 files changed, 30 insertions, 23 deletions
diff --git a/amqp/transport.py b/amqp/transport.py index 21d293a..975ced1 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -229,19 +229,22 @@ class SSLTransport(_AbstractTransport): # to get the exact number of bytes wanted. recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) # see note above - except socket.error as exc: - # ssl.sock.read may cause ENOENT if the - # operation couldn't be performed (Issue celery#1414). - if not initial and exc.errno in _errnos: - continue - raise exc - if not s: - raise IOError('Socket closed') - rbuf += s - + try: + while len(rbuf) < n: + try: + s = recv(131072) # see note above + except socket.error as exc: + # ssl.sock.read may cause ENOENT if the + # operation couldn't be performed (Issue celery#1414). + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result @@ -269,16 +272,20 @@ class TCPTransport(_AbstractTransport): """Read exactly n bytes from the socket""" recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) - except socket.error as exc: - if not initial and exc.errno in _errnos: - continue - raise - if not s: - raise IOError('Socket closed') - rbuf += s + try: + while len(rbuf) < n: + try: + s = recv(131072) + except socket.error as exc: + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result |