diff options
Diffstat (limited to 'amqp/transport.py')
-rw-r--r-- | amqp/transport.py | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/amqp/transport.py b/amqp/transport.py index 6563bd2..cc6922a 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -49,6 +49,7 @@ except: from struct import pack, unpack from .exceptions import UnexpectedFrame +from .utils import noop AMQP_PORT = 5672 @@ -139,22 +140,31 @@ class _AbstractTransport(object): self.sock.close() self.sock = None - def read_frame(self): + def read_frame(self, callback): frame_type, channel, size = unpack('>BHI', self._read(7, True)) payload = self._read(size) ch = ord(self._read(1)) if ch == 206: # '\xce' - return frame_type, channel, payload + callback(frame_type, channel, payload) else: - raise UnexpectedFrame( - 'Received 0x{0:02x} while expecting 0xce'.format(ch)) - - def write_frame(self, frame_type, channel, payload): - size = len(payload) - self._write(pack( - '>BHI%dsB' % size, - frame_type, channel, size, payload, 0xce, - )) + callback.throw(UnexpectedFrame( + 'Received 0x{0:02x} while expecting 0xce'.format(ch), + )) + + def write_frame(self, frame_type, channel, payload, callback): + callback = callback or noop() + try: + size = len(payload) + print('WRITE: %r' % (str(payload), )) + self._write(pack( + '>BHI%dsB' % size, + frame_type, channel, size, str(payload), 0xce, + )) + except Exception as exc: + print('CALLBACK: %r' % (callback, )) + callback.throw(exc) + else: + callback() class SSLTransport(_AbstractTransport): |