summaryrefslogtreecommitdiff
path: root/amqp/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'amqp/transport.py')
-rw-r--r--amqp/transport.py32
1 files changed, 21 insertions, 11 deletions
diff --git a/amqp/transport.py b/amqp/transport.py
index 6563bd2..cc6922a 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -49,6 +49,7 @@ except:
from struct import pack, unpack
from .exceptions import UnexpectedFrame
+from .utils import noop
AMQP_PORT = 5672
@@ -139,22 +140,31 @@ class _AbstractTransport(object):
self.sock.close()
self.sock = None
- def read_frame(self):
+ def read_frame(self, callback):
frame_type, channel, size = unpack('>BHI', self._read(7, True))
payload = self._read(size)
ch = ord(self._read(1))
if ch == 206: # '\xce'
- return frame_type, channel, payload
+ callback(frame_type, channel, payload)
else:
- raise UnexpectedFrame(
- 'Received 0x{0:02x} while expecting 0xce'.format(ch))
-
- def write_frame(self, frame_type, channel, payload):
- size = len(payload)
- self._write(pack(
- '>BHI%dsB' % size,
- frame_type, channel, size, payload, 0xce,
- ))
+ callback.throw(UnexpectedFrame(
+ 'Received 0x{0:02x} while expecting 0xce'.format(ch),
+ ))
+
+ def write_frame(self, frame_type, channel, payload, callback):
+ callback = callback or noop()
+ try:
+ size = len(payload)
+ print('WRITE: %r' % (str(payload), ))
+ self._write(pack(
+ '>BHI%dsB' % size,
+ frame_type, channel, size, str(payload), 0xce,
+ ))
+ except Exception as exc:
+ print('CALLBACK: %r' % (callback, ))
+ callback.throw(exc)
+ else:
+ callback()
class SSLTransport(_AbstractTransport):