summaryrefslogtreecommitdiff
path: root/amqp/method_framing.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-04-30 15:19:33 +0100
committerAsk Solem <ask@celeryproject.org>2013-04-30 15:19:33 +0100
commit5ea2a77d094b9ff9462008493c55de8c080855ca (patch)
treec49dbca880ff9261dbf58c04c703a93d36489b60 /amqp/method_framing.py
parent61436d6466425950db66f1921b82de06f55674f1 (diff)
downloadpy-amqp-async.tar.gz
async-experimentsasync
Diffstat (limited to 'amqp/method_framing.py')
-rw-r--r--amqp/method_framing.py110
1 files changed, 72 insertions, 38 deletions
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index d564d0d..135b922 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -29,6 +29,7 @@ from .basic_message import Message
from .exceptions import AMQPError, UnexpectedFrame
from .five import Queue, range, string
from .serialization import AMQPReader
+from .utils import noop, promise
__all__ = ['MethodReader']
@@ -94,40 +95,40 @@ class MethodReader(object):
self.expected_types = defaultdict(lambda: 1)
# not an actual byte count, just incremented whenever we receive
self.bytes_recv = 0
+ self.read_frame = self.source.read_frame
+ self.put_method = self.queue.put
+
+ def on_method_error(self, exc):
+ self.put_message(exc)
+
+ def on_method_received(self, frame_type, channel, payload):
+ self.bytes_recv += 1
+
+ if frame_type not in (self.expected_types[channel], 8):
+ self.put_method((
+ channel,
+ UnexpectedFrame(
+ 'Received frame {0} while expecting type: {1}'.format(
+ frame_type, self.expected_types[channel]),
+ )
+ ))
+ elif frame_type == 1:
+ self._process_method_frame(channel, payload)
+ elif frame_type == 2:
+ self._process_content_header(channel, payload)
+ elif frame_type == 3:
+ self._process_content_body(channel, payload)
+ elif frame_type == 8:
+ self._process_heartbeat(channel, payload)
def _next_method(self):
"""Read the next method from the source, once one complete method has
been assembled it is placed in the internal queue."""
empty = self.queue.empty
- read_frame = self.source.read_frame
+ read_frame = self.read_frame
while empty():
- try:
- frame_type, channel, payload = read_frame()
- except Exception as exc:
- #
- # Connection was closed? Framing Error?
- #
- self.queue.put(exc)
- break
-
- self.bytes_recv += 1
-
- if frame_type not in (self.expected_types[channel], 8):
- self.queue.put((
- channel,
- UnexpectedFrame(
- 'Received frame {0} while expecting type: {1}'.format(
- frame_type, self.expected_types[channel]),
- )
- ))
- elif frame_type == 1:
- self._process_method_frame(channel, payload)
- elif frame_type == 2:
- self._process_content_header(channel, payload)
- elif frame_type == 3:
- self._process_content_body(channel, payload)
- elif frame_type == 8:
- self._process_heartbeat(channel, payload)
+ read_frame(promise(self.on_method_received,
+ on_error=self.on_method_error))
def _process_heartbeat(self, channel, payload):
self.heartbeats += 1
@@ -198,8 +199,43 @@ class MethodWriter(object):
self.dest = dest
self.frame_max = frame_max
self.bytes_sent = 0
+ self.write_frame = self.dest.write_frame
+ self.chunk_size = self.frame_max - 8
+
+ def on_header_frame_sent(self, channel, method_sig, body, properties,
+ on_complete):
+ print('HEADER FRAME SENT: %r' % (on_complete, ))
+ body_size = len(body)
+ payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties
+ if body:
+ callback = promise(self.write_body, (channel, buffer(body),
+ 0, body_size, on_complete))
+ else:
+ callback = None
+ self.write_frame(1, channel, payload, callback)
+ if not body:
+ on_complete()
+
+ def on_bytes_sent(self):
+ self.bytes_sent += 1
- def write_method(self, channel, method_sig, args, content=None):
+ def write_body(self, channel, buf, offset, size, on_complete=None):
+ chunk_size = self.chunk_size
+ offset = offset or 0
+ print('SIZE: %r OFFSET: %r' % (size, offset))
+ if offset >= size:
+ print('BUF COMPLETE')
+ return on_complete()
+
+ callback = promise(self.write_body,
+ (channel, buf, offset + self.chunk_size, size,
+ on_complete))
+ self.write_frame(3, channel, buf[offset:offset + chunk_size],
+ callback)
+
+ def write_method(self, channel, method_sig, args,
+ content=None, on_complete=None):
+ on_complete = on_complete or noop()
write_frame = self.dest.write_frame
payload = pack('>HH', method_sig[0], method_sig[1]) + args
@@ -216,14 +252,12 @@ class MethodWriter(object):
body = body.encode(coding)
properties = content._serialize_properties()
- write_frame(1, channel, payload)
-
if content:
- payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties
-
- write_frame(2, channel, payload)
+ callback = promise(
+ self.on_header_frame_sent,
+ (channel, method_sig, body, properties, on_complete),
+ )
+ else:
+ callback = promise(self.on_bytes_sent).then(on_complete)
+ write_frame(1, channel, payload, callback)
- chunk_size = self.frame_max - 8
- for i in range(0, len(body), chunk_size):
- write_frame(3, channel, body[i:i + chunk_size])
- self.bytes_sent += 1