summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-04-30 15:19:33 +0100
committerAsk Solem <ask@celeryproject.org>2013-04-30 15:19:33 +0100
commit5ea2a77d094b9ff9462008493c55de8c080855ca (patch)
treec49dbca880ff9261dbf58c04c703a93d36489b60
parent61436d6466425950db66f1921b82de06f55674f1 (diff)
downloadpy-amqp-async.tar.gz
async-experimentsasync
-rw-r--r--amqp/__init__.py2
-rw-r--r--amqp/method_framing.py110
-rw-r--r--amqp/transport.py32
-rw-r--r--amqp/utils.py61
4 files changed, 156 insertions, 49 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py
index 919d993..ae91684 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -61,9 +61,11 @@ from .exceptions import ( # noqa
error_for_code,
__all__ as _all_exceptions,
)
+from .utils import promise # noqa
__all__ = [
'Connection',
'Channel',
'Message',
+ 'promise',
] + _all_exceptions
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index d564d0d..135b922 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -29,6 +29,7 @@ from .basic_message import Message
from .exceptions import AMQPError, UnexpectedFrame
from .five import Queue, range, string
from .serialization import AMQPReader
+from .utils import noop, promise
__all__ = ['MethodReader']
@@ -94,40 +95,40 @@ class MethodReader(object):
self.expected_types = defaultdict(lambda: 1)
# not an actual byte count, just incremented whenever we receive
self.bytes_recv = 0
+ self.read_frame = self.source.read_frame
+ self.put_method = self.queue.put
+
+ def on_method_error(self, exc):
+ self.put_message(exc)
+
+ def on_method_received(self, frame_type, channel, payload):
+ self.bytes_recv += 1
+
+ if frame_type not in (self.expected_types[channel], 8):
+ self.put_method((
+ channel,
+ UnexpectedFrame(
+ 'Received frame {0} while expecting type: {1}'.format(
+ frame_type, self.expected_types[channel]),
+ )
+ ))
+ elif frame_type == 1:
+ self._process_method_frame(channel, payload)
+ elif frame_type == 2:
+ self._process_content_header(channel, payload)
+ elif frame_type == 3:
+ self._process_content_body(channel, payload)
+ elif frame_type == 8:
+ self._process_heartbeat(channel, payload)
def _next_method(self):
"""Read the next method from the source, once one complete method has
been assembled it is placed in the internal queue."""
empty = self.queue.empty
- read_frame = self.source.read_frame
+ read_frame = self.read_frame
while empty():
- try:
- frame_type, channel, payload = read_frame()
- except Exception as exc:
- #
- # Connection was closed? Framing Error?
- #
- self.queue.put(exc)
- break
-
- self.bytes_recv += 1
-
- if frame_type not in (self.expected_types[channel], 8):
- self.queue.put((
- channel,
- UnexpectedFrame(
- 'Received frame {0} while expecting type: {1}'.format(
- frame_type, self.expected_types[channel]),
- )
- ))
- elif frame_type == 1:
- self._process_method_frame(channel, payload)
- elif frame_type == 2:
- self._process_content_header(channel, payload)
- elif frame_type == 3:
- self._process_content_body(channel, payload)
- elif frame_type == 8:
- self._process_heartbeat(channel, payload)
+ read_frame(promise(self.on_method_received,
+ on_error=self.on_method_error))
def _process_heartbeat(self, channel, payload):
self.heartbeats += 1
@@ -198,8 +199,43 @@ class MethodWriter(object):
self.dest = dest
self.frame_max = frame_max
self.bytes_sent = 0
+ self.write_frame = self.dest.write_frame
+ self.chunk_size = self.frame_max - 8
+
+ def on_header_frame_sent(self, channel, method_sig, body, properties,
+ on_complete):
+ print('HEADER FRAME SENT: %r' % (on_complete, ))
+ body_size = len(body)
+ payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties
+ if body:
+ callback = promise(self.write_body, (channel, buffer(body),
+ 0, body_size, on_complete))
+ else:
+ callback = None
+ self.write_frame(1, channel, payload, callback)
+ if not body:
+ on_complete()
+
+ def on_bytes_sent(self):
+ self.bytes_sent += 1
- def write_method(self, channel, method_sig, args, content=None):
+ def write_body(self, channel, buf, offset, size, on_complete=None):
+ chunk_size = self.chunk_size
+ offset = offset or 0
+ print('SIZE: %r OFFSET: %r' % (size, offset))
+ if offset >= size:
+ print('BUF COMPLETE')
+ return on_complete()
+
+ callback = promise(self.write_body,
+ (channel, buf, offset + self.chunk_size, size,
+ on_complete))
+ self.write_frame(3, channel, buf[offset:offset + chunk_size],
+ callback)
+
+ def write_method(self, channel, method_sig, args,
+ content=None, on_complete=None):
+ on_complete = on_complete or noop()
write_frame = self.dest.write_frame
payload = pack('>HH', method_sig[0], method_sig[1]) + args
@@ -216,14 +252,12 @@ class MethodWriter(object):
body = body.encode(coding)
properties = content._serialize_properties()
- write_frame(1, channel, payload)
-
if content:
- payload = pack('>HHQ', method_sig[0], 0, len(body)) + properties
-
- write_frame(2, channel, payload)
+ callback = promise(
+ self.on_header_frame_sent,
+ (channel, method_sig, body, properties, on_complete),
+ )
+ else:
+ callback = promise(self.on_bytes_sent).then(on_complete)
+ write_frame(1, channel, payload, callback)
- chunk_size = self.frame_max - 8
- for i in range(0, len(body), chunk_size):
- write_frame(3, channel, body[i:i + chunk_size])
- self.bytes_sent += 1
diff --git a/amqp/transport.py b/amqp/transport.py
index 6563bd2..cc6922a 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -49,6 +49,7 @@ except:
from struct import pack, unpack
from .exceptions import UnexpectedFrame
+from .utils import noop
AMQP_PORT = 5672
@@ -139,22 +140,31 @@ class _AbstractTransport(object):
self.sock.close()
self.sock = None
- def read_frame(self):
+ def read_frame(self, callback):
frame_type, channel, size = unpack('>BHI', self._read(7, True))
payload = self._read(size)
ch = ord(self._read(1))
if ch == 206: # '\xce'
- return frame_type, channel, payload
+ callback(frame_type, channel, payload)
else:
- raise UnexpectedFrame(
- 'Received 0x{0:02x} while expecting 0xce'.format(ch))
-
- def write_frame(self, frame_type, channel, payload):
- size = len(payload)
- self._write(pack(
- '>BHI%dsB' % size,
- frame_type, channel, size, payload, 0xce,
- ))
+ callback.throw(UnexpectedFrame(
+ 'Received 0x{0:02x} while expecting 0xce'.format(ch),
+ ))
+
+ def write_frame(self, frame_type, channel, payload, callback):
+ callback = callback or noop()
+ try:
+ size = len(payload)
+ print('WRITE: %r' % (str(payload), ))
+ self._write(pack(
+ '>BHI%dsB' % size,
+ frame_type, channel, size, str(payload), 0xce,
+ ))
+ except Exception as exc:
+ print('CALLBACK: %r' % (callback, ))
+ callback.throw(exc)
+ else:
+ callback()
class SSLTransport(_AbstractTransport):
diff --git a/amqp/utils.py b/amqp/utils.py
new file mode 100644
index 0000000..05dbc93
--- /dev/null
+++ b/amqp/utils.py
@@ -0,0 +1,61 @@
+from __future__ import absolute_import
+
+import sys
+
+
+class promise(object):
+ if not hasattr(sys, 'pypy_version_info'):
+ __slots__ = tuple(
+ 'fun args kwargs value ready failed on_success on_error'.split()
+ )
+
+ def __init__(self, fun, args=(), kwargs=(),
+ on_success=None, on_error=None):
+ self.fun = fun
+ self.args = args
+ self.kwargs = kwargs
+ self.ready = False
+ self.failed = False
+ self.on_success = on_success
+ self.on_error = on_error
+ self.value = None
+
+ def __repr__(self):
+ return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format(
+ self,
+ )
+
+ def __call__(self, *args, **kwargs):
+ try:
+ self.value = self.fun(
+ *self.args + args if self.args else args,
+ **dict(self.kwargs, **kwargs) if self.kwargs else kwargs
+ )
+ except Exception as exc:
+ self.set_error_state(exc)
+ else:
+ if self.on_success:
+ self.on_success(self.value)
+ finally:
+ self.ready = True
+
+ def then(self, callback=None, on_error=None):
+ self.on_success = callback
+ self.on_error = on_error
+ return callback
+
+ def set_error_state(self, exc):
+ self.failed = True
+ if self.on_error is None:
+ raise
+ self.on_error(exc)
+
+ def throw(self, exc):
+ try:
+ raise exc
+ except exc.__class__ as with_cause:
+ self.set_error_state(with_cause)
+
+
+def noop():
+ return promise(lambda *a, **k: None)