summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommie Gannert <tommie@spotify.com>2013-11-08 17:22:24 +0100
committerAsk Solem <ask@celeryproject.org>2013-11-11 15:23:07 +0000
commit79fd8dbd153200f2a382147d160ad2bde5009eef (patch)
tree6066c9fdc34593819aca69c21de8f9cfd1003110
parenta6cdb3b80bd6e6488c80492dcd8d63910270b645 (diff)
downloadpy-amqp-79fd8dbd153200f2a382147d160ad2bde5009eef.tar.gz
Save data read in transport before raising exception.
If the exception raised is due to a timeout, we might read from the transport again. In the old code, the data was discarded, causing loss of synchronization. This ensures the data we see is preserved for next time. Conflicts: amqp/transport.py
-rw-r--r--amqp/transport.py53
1 files changed, 30 insertions, 23 deletions
diff --git a/amqp/transport.py b/amqp/transport.py
index 21d293a..975ced1 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -229,19 +229,22 @@ class SSLTransport(_AbstractTransport):
# to get the exact number of bytes wanted.
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072) # see note above
- except socket.error as exc:
- # ssl.sock.read may cause ENOENT if the
- # operation couldn't be performed (Issue celery#1414).
- if not initial and exc.errno in _errnos:
- continue
- raise exc
- if not s:
- raise IOError('Socket closed')
- rbuf += s
-
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072) # see note above
+ except socket.error as exc:
+ # ssl.sock.read may cause ENOENT if the
+ # operation couldn't be performed (Issue celery#1414).
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
@@ -269,16 +272,20 @@ class TCPTransport(_AbstractTransport):
"""Read exactly n bytes from the socket"""
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072)
- except socket.error as exc:
- if not initial and exc.errno in _errnos:
- continue
- raise
- if not s:
- raise IOError('Socket closed')
- rbuf += s
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072)
+ except socket.error as exc:
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result