summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-11-13 17:03:47 +0000
committerAsk Solem <ask@celeryproject.org>2013-11-13 17:03:47 +0000
commit140811f91b39603d5f116f04255fa00afa254e87 (patch)
tree5e1f109236082111809d18ae57b0b68a2b08dc0c
parentd4d1918631ac84d532d7af5fa952c68bb289ec5a (diff)
downloadpy-amqp-2.0-devel.tar.gz
humble beginnings2.0-devel
-rw-r--r--amqp/async.py13
-rw-r--r--amqp/transport.py51
2 files changed, 64 insertions, 0 deletions
diff --git a/amqp/async.py b/amqp/async.py
new file mode 100644
index 0000000..f1f802c
--- /dev/null
+++ b/amqp/async.py
@@ -0,0 +1,13 @@
+from __future__ import absolute_import
+
+_current_loop = None
+
+
+def get_event_loop():
+ return _current_loop
+
+
+def set_event_loop(loop):
+ global _current_loop
+ _current_loop = loop
+ return loop
diff --git a/amqp/transport.py b/amqp/transport.py
index 975ced1..79eb2f2 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -157,6 +157,57 @@ class _AbstractTransport(object):
self.sock = None
self.connected = False
+ def _read_frame(self, then, unpack_from=unpack_from):
+ read = self._read
+
+ buf = bytearray(7)
+ bufv = memoryview(buf)
+ Hr = Br = Cr = 0
+ while Hr < 7:
+ try:
+ n = self.read(bufv[Hr:], 7 - Hr)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Hr += n
+
+ frame_type, channel, size = unpack_from('>BHI', bufv)
+
+ buf = bytearray(size)
+ bufv = memoryview(buf)
+ while Br < size:
+ try:
+ n = read(bufv[Br:], size - Br)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVAIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Br += n
+
+ chbuf = bytearray(size)
+ chbufv = memoryview(chbuf)
+
+ while Cr < 1:
+ try:
+ n = read(chbuf[Cr:], 1 - Cr)
+ except (OSError, IOError, socket.error) as exc:
+ if exc.errno not in _UNAVAIL:
+ raise
+ yield
+ if n == 0:
+ raise ConnectionError('socket disconnected')
+ Cr += n
+
+ if ord(chbuf[0]) != 206:
+ raise UnexpectedFrame(
+ 'Received 0x{0:02x} while expecting 0xce'.format(chbuf[0]))
+ then(frame_type, channel, payload)
+
def read_frame(self, unpack=unpack):
read = self._read
try: