summaryrefslogtreecommitdiff
path: root/amqp/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/transport.py')
-rw-r--r--amqp/transport.py51
1 files changed, 51 insertions, 0 deletions
diff --git a/amqp/transport.py b/amqp/transport.py
index 975ced1..79eb2f2 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -157,6 +157,57 @@ class _AbstractTransport(object):
self.sock = None
self.connected = False
+ def _read_frame(self, then, unpack_from=unpack_from):
+ read = self._read
+
+ buf = bytearray(7)
+ bufv = memoryview(buf)
+ Hr = Br = Cr = 0
+ while Hr < 7:
+ try:
+ n = self.read(bufv[Hr:], 7 - Hr)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Hr += n
+
+ frame_type, channel, size = unpack_from('>BHI', bufv)
+
+ buf = bytearray(size)
+ bufv = memoryview(buf)
+ while Br < size:
+ try:
+ n = read(bufv[Br:], size - Br)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVAIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Br += n
+
+ chbuf = bytearray(size)
+ chbufv = memoryview(chbuf)
+
+ while Cr < 1:
+ try:
+ n = read(chbuf[Cr:], 1 - Cr)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVAIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Cr += n
+
+ if ord(chbuf[0]) != 206:
+ raise UnexpectedFrame(
+ 'Received 0x{0:02x} while expecting 0xce'.format(chbuf[0]))
+ then(frame_type, channel, payload)
+
def read_frame(self, unpack=unpack):
read = self._read
try: