diff options
author | Ask Solem <ask@celeryproject.org> | 2013-04-30 15:19:33 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-04-30 15:19:33 +0100 |
commit | 5ea2a77d094b9ff9462008493c55de8c080855ca (patch) | |
tree | c49dbca880ff9261dbf58c04c703a93d36489b60 | |
parent | 61436d6466425950db66f1921b82de06f55674f1 (diff) | |
download | py-amqp-async.tar.gz |
async-experimentsasync
-rw-r--r-- | amqp/__init__.py | 2 | ||||
-rw-r--r-- | amqp/method_framing.py | 110 | ||||
-rw-r--r-- | amqp/transport.py | 32 | ||||
-rw-r--r-- | amqp/utils.py | 61 |
4 files changed, 156 insertions, 49 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py index 919d993..ae91684 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -61,9 +61,11 @@ from .exceptions import ( # noqa error_for_code, __all__ as _all_exceptions, ) +from .utils import promise # noqa __all__ = [ 'Connection', 'Channel', 'Message', + 'promise', ] + _all_exceptions diff --git a/amqp/method_framing.py b/amqp/method_framing.py index d564d0d..135b922 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -29,6 +29,7 @@ from .basic_message import Message from .exceptions import AMQPError, UnexpectedFrame from .five import Queue, range, string from .serialization import AMQPReader +from .utils import noop, promise __all__ = ['MethodReader'] @@ -94,40 +95,40 @@ class MethodReader(object): self.expected_types = defaultdict(lambda: 1) # not an actual byte count, just incremented whenever we receive self.bytes_recv = 0 + self.read_frame = self.source.read_frame + self.put_method = self.queue.put + + def on_method_error(self, exc): + self.put_message(exc) + + def on_method_received(self, frame_type, channel, payload): + self.bytes_recv += 1 + + if frame_type not in (self.expected_types[channel], 8): + self.put_method(( + channel, + UnexpectedFrame( + 'Received frame {0} while expecting type: {1}'.format( + frame_type, self.expected_types[channel]), + ) + )) + elif frame_type == 1: + self._process_method_frame(channel, payload) + elif frame_type == 2: + self._process_content_header(channel, payload) + elif frame_type == 3: + self._process_content_body(channel, payload) + elif frame_type == 8: + self._process_heartbeat(channel, payload) def _next_method(self): """Read the next method from the source, once one complete method has been assembled it is placed in the internal queue.""" empty = self.queue.empty - read_frame = self.source.read_frame + read_frame = self.read_frame while empty(): - try: - frame_type, channel, payload = read_frame() - except Exception as exc: - # - # Connection was closed? Framing Error? - # - self.queue.put(exc) - break - - self.bytes_recv += 1 - - if frame_type not in (self.expected_types[channel], 8): - self.queue.put(( - channel, - UnexpectedFrame( - 'Received frame {0} while expecting type: {1}'.format( - frame_type, self.expected_types[channel]), - ) - )) - elif frame_type == 1: - self._process_method_frame(channel, payload) - elif frame_type == 2: - self._process_content_header(channel, payload) - elif frame_type == 3: - self._process_content_body(channel, payload) - elif frame_type == 8: - self._process_heartbeat(channel, payload) + read_frame(promise(self.on_method_received, + on_error=self.on_method_error)) def _process_heartbeat(self, channel, payload): self.heartbeats += 1 @@ -198,8 +199,43 @@ class MethodWriter(object): self.dest = dest self.frame_max = frame_max self.bytes_sent = 0 + self.write_frame = self.dest.write_frame + self.chunk_size = self.frame_max - 8 + + def on_header_frame_sent(self, channel, method_sig, body, properties, + on_complete): + print('HEADER FRAME SENT: %r' % (on_complete, )) + body_size = len(body) + payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties + if body: + callback = promise(self.write_body, (channel, buffer(body), + 0, body_size, on_complete)) + else: + callback = None + self.write_frame(1, channel, payload, callback) + if not body: + on_complete() + + def on_bytes_sent(self): + self.bytes_sent += 1 - def write_method(self, channel, method_sig, args, content=None): + def write_body(self, channel, buf, offset, size, on_complete=None): + chunk_size = self.chunk_size + offset = offset or 0 + print('SIZE: %r OFFSET: %r' % (size, offset)) + if offset >= size: + print('BUF COMPLETE') + return on_complete() + + callback = promise(self.write_body, + (channel, buf, offset + self.chunk_size, size, + on_complete)) + self.write_frame(3, channel, buf[offset:offset + chunk_size], + callback) + + def write_method(self, channel, method_sig, args, + content=None, on_complete=None): + on_complete = on_complete or noop() write_frame = self.dest.write_frame payload = pack('>HH', method_sig[0], method_sig[1]) + args @@ -216,14 +252,12 @@ class MethodWriter(object): body = body.encode(coding) properties = content._serialize_properties() - write_frame(1, channel, payload) - if content: - payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties - - write_frame(2, channel, payload) + callback = promise( + self.on_header_frame_sent, + (channel, method_sig, body, properties, on_complete), + ) + else: + callback = promise(self.on_bytes_sent).then(on_complete) + write_frame(1, channel, payload, callback) - chunk_size = self.frame_max - 8 - for i in range(0, len(body), chunk_size): - write_frame(3, channel, body[i:i + chunk_size]) - self.bytes_sent += 1 diff --git a/amqp/transport.py b/amqp/transport.py index 6563bd2..cc6922a 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -49,6 +49,7 @@ except: from struct import pack, unpack from .exceptions import UnexpectedFrame +from .utils import noop AMQP_PORT = 5672 @@ -139,22 +140,31 @@ class _AbstractTransport(object): self.sock.close() self.sock = None - def read_frame(self): + def read_frame(self, callback): frame_type, channel, size = unpack('>BHI', self._read(7, True)) payload = self._read(size) ch = ord(self._read(1)) if ch == 206: # '\xce' - return frame_type, channel, payload + callback(frame_type, channel, payload) else: - raise UnexpectedFrame( - 'Received 0x{0:02x} while expecting 0xce'.format(ch)) - - def write_frame(self, frame_type, channel, payload): - size = len(payload) - self._write(pack( - '>BHI%dsB' % size, - frame_type, channel, size, payload, 0xce, - )) + callback.throw(UnexpectedFrame( + 'Received 0x{0:02x} while expecting 0xce'.format(ch), + )) + + def write_frame(self, frame_type, channel, payload, callback): + callback = callback or noop() + try: + size = len(payload) + print('WRITE: %r' % (str(payload), )) + self._write(pack( + '>BHI%dsB' % size, + frame_type, channel, size, str(payload), 0xce, + )) + except Exception as exc: + print('CALLBACK: %r' % (callback, )) + callback.throw(exc) + else: + callback() class SSLTransport(_AbstractTransport): diff --git a/amqp/utils.py b/amqp/utils.py new file mode 100644 index 0000000..05dbc93 --- /dev/null +++ b/amqp/utils.py @@ -0,0 +1,61 @@ +from __future__ import absolute_import + +import sys + + +class promise(object): + if not hasattr(sys, 'pypy_version_info'): + __slots__ = tuple( + 'fun args kwargs value ready failed on_success on_error'.split() + ) + + def __init__(self, fun, args=(), kwargs=(), + on_success=None, on_error=None): + self.fun = fun + self.args = args + self.kwargs = kwargs + self.ready = False + self.failed = False + self.on_success = on_success + self.on_error = on_error + self.value = None + + def __repr__(self): + return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format( + self, + ) + + def __call__(self, *args, **kwargs): + try: + self.value = self.fun( + *self.args + args if self.args else args, + **dict(self.kwargs, **kwargs) if self.kwargs else kwargs + ) + except Exception as exc: + self.set_error_state(exc) + else: + if self.on_success: + self.on_success(self.value) + finally: + self.ready = True + + def then(self, callback=None, on_error=None): + self.on_success = callback + self.on_error = on_error + return callback + + def set_error_state(self, exc): + self.failed = True + if self.on_error is None: + raise + self.on_error(exc) + + def throw(self, exc): + try: + raise exc + except exc.__class__ as with_cause: + self.set_error_state(with_cause) + + +def noop(): + return promise(lambda *a, **k: None) |