diff options
author | Ask Solem <ask@celeryproject.org> | 2013-04-30 15:19:33 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-04-30 15:19:33 +0100 |
commit | 5ea2a77d094b9ff9462008493c55de8c080855ca (patch) | |
tree | c49dbca880ff9261dbf58c04c703a93d36489b60 /amqp/method_framing.py | |
parent | 61436d6466425950db66f1921b82de06f55674f1 (diff) | |
download | py-amqp-async.tar.gz |
async-experimentsasync
Diffstat (limited to 'amqp/method_framing.py')
-rw-r--r-- | amqp/method_framing.py | 110 |
1 files changed, 72 insertions, 38 deletions
diff --git a/amqp/method_framing.py b/amqp/method_framing.py index d564d0d..135b922 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -29,6 +29,7 @@ from .basic_message import Message from .exceptions import AMQPError, UnexpectedFrame from .five import Queue, range, string from .serialization import AMQPReader +from .utils import noop, promise __all__ = ['MethodReader'] @@ -94,40 +95,40 @@ class MethodReader(object): self.expected_types = defaultdict(lambda: 1) # not an actual byte count, just incremented whenever we receive self.bytes_recv = 0 + self.read_frame = self.source.read_frame + self.put_method = self.queue.put + + def on_method_error(self, exc): + self.put_message(exc) + + def on_method_received(self, frame_type, channel, payload): + self.bytes_recv += 1 + + if frame_type not in (self.expected_types[channel], 8): + self.put_method(( + channel, + UnexpectedFrame( + 'Received frame {0} while expecting type: {1}'.format( + frame_type, self.expected_types[channel]), + ) + )) + elif frame_type == 1: + self._process_method_frame(channel, payload) + elif frame_type == 2: + self._process_content_header(channel, payload) + elif frame_type == 3: + self._process_content_body(channel, payload) + elif frame_type == 8: + self._process_heartbeat(channel, payload) def _next_method(self): """Read the next method from the source, once one complete method has been assembled it is placed in the internal queue.""" empty = self.queue.empty - read_frame = self.source.read_frame + read_frame = self.read_frame while empty(): - try: - frame_type, channel, payload = read_frame() - except Exception as exc: - # - # Connection was closed? Framing Error? - # - self.queue.put(exc) - break - - self.bytes_recv += 1 - - if frame_type not in (self.expected_types[channel], 8): - self.queue.put(( - channel, - UnexpectedFrame( - 'Received frame {0} while expecting type: {1}'.format( - frame_type, self.expected_types[channel]), - ) - )) - elif frame_type == 1: - self._process_method_frame(channel, payload) - elif frame_type == 2: - self._process_content_header(channel, payload) - elif frame_type == 3: - self._process_content_body(channel, payload) - elif frame_type == 8: - self._process_heartbeat(channel, payload) + read_frame(promise(self.on_method_received, + on_error=self.on_method_error)) def _process_heartbeat(self, channel, payload): self.heartbeats += 1 @@ -198,8 +199,43 @@ class MethodWriter(object): self.dest = dest self.frame_max = frame_max self.bytes_sent = 0 + self.write_frame = self.dest.write_frame + self.chunk_size = self.frame_max - 8 + + def on_header_frame_sent(self, channel, method_sig, body, properties, + on_complete): + print('HEADER FRAME SENT: %r' % (on_complete, )) + body_size = len(body) + payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties + if body: + callback = promise(self.write_body, (channel, buffer(body), + 0, body_size, on_complete)) + else: + callback = None + self.write_frame(1, channel, payload, callback) + if not body: + on_complete() + + def on_bytes_sent(self): + self.bytes_sent += 1 - def write_method(self, channel, method_sig, args, content=None): + def write_body(self, channel, buf, offset, size, on_complete=None): + chunk_size = self.chunk_size + offset = offset or 0 + print('SIZE: %r OFFSET: %r' % (size, offset)) + if offset >= size: + print('BUF COMPLETE') + return on_complete() + + callback = promise(self.write_body, + (channel, buf, offset + self.chunk_size, size, + on_complete)) + self.write_frame(3, channel, buf[offset:offset + chunk_size], + callback) + + def write_method(self, channel, method_sig, args, + content=None, on_complete=None): + on_complete = on_complete or noop() write_frame = self.dest.write_frame payload = pack('>HH', method_sig[0], method_sig[1]) + args @@ -216,14 +252,12 @@ class MethodWriter(object): body = body.encode(coding) properties = content._serialize_properties() - write_frame(1, channel, payload) - if content: - payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties - - write_frame(2, channel, payload) + callback = promise( + self.on_header_frame_sent, + (channel, method_sig, body, properties, on_complete), + ) + else: + callback = promise(self.on_bytes_sent).then(on_complete) + write_frame(1, channel, payload, callback) - chunk_size = self.frame_max - 8 - for i in range(0, len(body), chunk_size): - write_frame(3, channel, body[i:i + chunk_size]) - self.bytes_sent += 1 |