summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-07-22 20:18:48 +0100
committerAsk Solem <ask@celeryproject.org>2014-07-22 20:18:48 +0100
commit398a521e794831d53f2dd8594fa71046cb2df6c0 (patch)
treef58374a8b4253f0d859bd57196fb9ee9756bebc5
parent9881734d2cd5b2f0d0f9b9df158dd2d9d20e69c7 (diff)
downloadpy-amqp-callbacks.tar.gz
More workcallbacks
-rw-r--r--amqp/__init__.py10
-rw-r--r--amqp/channel.py14
-rw-r--r--amqp/connection.py92
-rw-r--r--amqp/events.py3
-rw-r--r--amqp/method_framing.py27
-rw-r--r--amqp/promise.py6
6 files changed, 92 insertions, 60 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py
index 38546e6..75d98cb 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -29,10 +29,10 @@ __docformat__ = 'restructuredtext'
#
# Pull in the public items from the various sub-modules
#
-from .basic_message import Message # noqa
+from .basic_message import Message # noqa
from .channel import Channel # noqa
-from .connection import Connection # noqa
-from .exceptions import ( # noqa
+from .connection import AsynConnection, BlockingConnection, Connection # noqa
+from .exceptions import ( # noqa
AMQPError,
ConnectionError,
RecoverableConnectionError,
@@ -61,9 +61,11 @@ from .exceptions import ( # noqa
error_for_code,
__all__ as _all_exceptions,
)
-from .promise import promise, barrier, maybe_promise # noqa
+from .promise import promise, barrier, maybe_promise # noqa
__all__ = [
+ 'AsynConnection',
+ 'BlockingConnection',
'Connection',
'Channel',
'Message',
diff --git a/amqp/channel.py b/amqp/channel.py
index 35bd77a..1a7a3ac 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -125,8 +125,6 @@ class Channel(object):
if self.connection.confirm_publish:
self.basic_publish = self.basic_publish_confirm
- self._x_open()
-
def then(self, on_success, on_error=None):
return self.on_open.then(on_success, on_error)
@@ -181,7 +179,7 @@ class Channel(object):
def _do_revive(self):
self.is_open = False
- self._x_open()
+ self.open()
def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
argsig='BsBB', on_sent=None, callback=None):
@@ -367,7 +365,7 @@ class Channel(object):
self.active = active
self.send_method(spec.Channel.FlowOk, 'b', (active, ))
- def _x_open(self):
+ def open(self):
"""Open a channel for use
This method opens a virtual connection (a channel).
@@ -1121,6 +1119,10 @@ class Channel(object):
consumer count
"""
+ on_ready = None
+ if callback:
+ on_ready = promise()
+ on_ready.then(callback) # Need the original args passed for filter
p = self.send_method(
spec.Queue.Declare, argsig,
(0, queue, passive, durable, exclusive, auto_delete,
@@ -1130,7 +1132,7 @@ class Channel(object):
return p
return self.maybe_wait(
spec.Queue.DeclareOk, returns_tuple=True,
- filter=queue_declare_ok_t, callback=callback,
+ filter=queue_declare_ok_t, callback=on_ready,
)
def queue_delete(self, queue='', if_unused=False, if_empty=False,
@@ -1551,7 +1553,6 @@ class Channel(object):
be set to True in that case.
"""
-
on_consume_ready = promise(
self._on_consume_ready, (callback, on_cancel, no_ack, ),
)
@@ -1716,6 +1717,7 @@ class Channel(object):
The server SHOULD implement the immediate flag.
"""
+ print('BASIC PUBLISH: %r' % (msg, ))
return self.send_method(
spec.Basic.Publish, argsig,
(0, exchange, routing_key, mandatory, immediate), msg,
diff --git a/amqp/connection.py b/amqp/connection.py
index c280981..72f698d 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -59,6 +59,7 @@ LIBRARY_PROPERTIES = {
}
logger = get_logger(__name__)
+debug = logger.debug
class BaseConnection(object):
@@ -145,7 +146,7 @@ class BaseConnection(object):
ssl=False, connect_timeout=None, channel_max=None,
frame_max=None, heartbeat=0, on_open=None, on_blocked=None,
on_unblocked=None, confirm_publish=False,
- on_tune_ok=None, **kwargs):
+ on_tune_ok=None, loop=None, **kwargs):
"""Create a connection to the specified host, which should be
a 'host[:port]', such as 'localhost', or '1.2.3.4:5672'
(defaults to 'localhost', if a port is not specified then
@@ -196,6 +197,7 @@ class BaseConnection(object):
self.confirm_publish = confirm_publish
# Callbacks
+ self._blocking = deque()
self.on_blocked = on_blocked
self.on_unblocked = on_unblocked
self.on_open = ensure_promise(on_open)
@@ -216,12 +218,16 @@ class BaseConnection(object):
self.transport = self.Transport(host, ssl)
self.transport.connect(self._outbound, self._outbound_ready,
timeout=connect_timeout)
- self._frame_handler = frame_handler(self, self.on_inbound_method)
+ self.on_inbound_frame = frame_handler(self, self.on_inbound_method)
self._frame_writer = frame_writer(self, self.transport, self._outbound)
self._inbound = deque()
- self._inbound_handler = inbound_handler(self, self._inbound)
- self._read_frame = self._inbound_handler.send
- self.on_inbound_frame = self._frame_handler.send
+ self._read_frame = inbound_handler(self, self._inbound)
+ if loop:
+ self._on_loop_set(loop)
+ self.connect()
+
+ def _on_loop_set(self, loop):
+ pass
def __enter__(self):
return self
@@ -273,6 +279,7 @@ class BaseConnection(object):
1, channel.channel_id, sig, args, content, on_sent,
))
if wait:
+ print('WAIT ========= %r' % (self._maybe_wait, ))
return self.maybe_wait(channel, wait, callback)
return on_sent
@@ -337,7 +344,6 @@ class BaseConnection(object):
)
def _on_open_ok(self):
- print('CONNECTION NOW OPEN')
self.on_open(self)
def FIXME(self, *args, **kwargs):
@@ -385,10 +391,10 @@ class BaseConnection(object):
return self.channels[channel_id]
except KeyError:
p = self.Channel(self, channel_id, on_open=callback)
- print('CREATE CHANNEL: %r' % (p.channel_id, ))
self.channels[p.channel_id] = p
if callback:
p.then(callback)
+ p.open()
return p
def is_alive(self):
@@ -402,7 +408,7 @@ class BaseConnection(object):
if not self.connected:
raise ConnectionError('Connection lost')
try:
- self._read_frame(None)
+ self._read_frame()
except socket.timeout:
raise
except (socket.error, IOError) as exc:
@@ -418,20 +424,18 @@ class BaseConnection(object):
type_, channel, framelen = unpack_from('>BHI', frame)
if type_ == 1:
method_sig = unpack_from('>HH', frame, 7)
- print(
+ debug(
'METHOD: %r CHANNEL: %r LEN: %r' % (
METHOD_NAME_MAP[method_sig], channel, framelen),
)
else:
- print('FRAME: %r CHANNEL: %r LEN: %r' % (
+ debug('FRAME: %r CHANNEL: %r LEN: %r' % (
type_, channel, framelen))
def on_writable(self):
outbound = self._outbound
if outbound:
buf, borrowed, callback = outbound.popleft()
- print('WRITE: BOR: %r CB: %r -> %r' % (
- borrowed, callback, len(buf)))
self._debug_outgoing_frame(buf)
try:
bytes_sent = self.sock.send(buf)
@@ -687,28 +691,20 @@ class BaseConnection(object):
def server_capabilities(self):
return self.server_properties.get('capabilities') or {}
+ def setblocking(self, blocking):
+ print('SETBLOCKING: %r' % (blocking, ))
+ if blocking:
+ self._maybe_wait = self._wait_block
+ self.sock.setblocking(1)
+ else:
+ self._maybe_wait = self._wait_async
+ self.sock.setblocking(0)
-class AsynConnection(BaseConnection):
-
- def connect(self):
- pass
-
- def _maybe_wait(self, channel, method, callback, returns_tuple, filter):
+ def _wait_async(self, channel, method, callback, returns_tuple, filter):
return channel.events.call_on(method, callback)
-Thenable.register(AsynConnection)
-
-class BlockingConnection(BaseConnection):
-
- def __init__(self, *args, **kwargs):
- self._blocking = deque()
- super(BlockingConnection, self).__init__(*args, **kwargs)
-
- def connect(self):
- while not self.on_open.ready:
- self.drain_events()
-
- def _maybe_wait(self, channel, method, callback, returns_tuple, filter):
+ def _wait_block(self, channel, method, callback, returns_tuple, filter):
+ from amqp.exceptions import METHOD_NAME_MAP
with channel.events.save_and_set_pending_for(method, callback):
while not callback.ready:
self.drain_events()
@@ -721,8 +717,11 @@ class BlockingConnection(BaseConnection):
return filter(args[0]) if filter else args[0]
def drain_events(self, timeout=None):
+ print('DRAIN EVENTS')
+ #import traceback
+ #traceback.print_stack()
if timeout is None:
- self._read_frame(None)
+ self._read_frame()
while self._inbound:
self.on_inbound_frame(self._inbound.popleft())
self._call_pending_blocking_callbacks()
@@ -735,7 +734,7 @@ class BlockingConnection(BaseConnection):
sock.settimeout(timeout)
try:
try:
- self._read_frame(None)
+ self._read_frame()
except SSLError as exc:
# http://bugs.python.org/issue10272
if 'timed out' in str(exc):
@@ -760,6 +759,31 @@ class BlockingConnection(BaseConnection):
while self._blocking:
callback = self._blocking.popleft()
callback()
+
+ def register_with_event_loop(self, loop):
+ print('REGISTER WITH EVENT LOOP: %r' % (loop, ))
+ self.loop = loop
+ self.setblocking(0)
+ loop.add_reader(self.sock, self.on_readable)
+
+
+class AsynConnection(BaseConnection):
+ _maybe_wait = BaseConnection._wait_async
+
+ def connect(self):
+ pass
+
+ def _on_loop_set(self, loop):
+ self.register_with_event_loop(loop)
+Thenable.register(AsynConnection)
+
+
+class BlockingConnection(BaseConnection):
+ _maybe_wait = BaseConnection._wait_block
+
+ def connect(self):
+ while not self.on_open.ready:
+ self.drain_events()
Thenable.register(BlockingConnection)
-Connection = AsynConnection
+Connection = BlockingConnection
diff --git a/amqp/events.py b/amqp/events.py
index d69b577..af8b668 100644
--- a/amqp/events.py
+++ b/amqp/events.py
@@ -58,6 +58,8 @@ class Events(object):
get_listener = self._listeners.__getitem__
reserve_pending = self._pending.pop
+ from amqp.exceptions import METHOD_NAME_MAP
+
def dispatch(method_sig, payload, content):
try:
amqp_method = get_method(method_sig)
@@ -83,7 +85,6 @@ class Events(object):
args.append(content)
for callback in callbacks:
- print('CALLING: %r' % (callback, ))
callback(*args)
return dispatch
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index b7e17fb..fa29721 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -46,13 +46,13 @@ _CONTENT_METHODS = frozenset([
])
-@coro
def frame_handler(connection, callback,
unpack_from=unpack_from, content_methods=_CONTENT_METHODS):
expected_types = defaultdict(lambda: 1)
partial_messages = {}
- while 1:
- frame_type, channel, buf = yield
+
+ def on_frame(frame):
+ frame_type, channel, buf = frame
connection.bytes_recv += 1
if frame_type not in (expected_types[channel], 8):
raise UnexpectedFrame(
@@ -96,6 +96,8 @@ def frame_handler(connection, callback,
# bytes_recv already updated
pass
+ return on_frame
+
@coro
def frame_writer(connection, transport, outbound,
@@ -198,7 +200,6 @@ def frame_writer(connection, transport, outbound,
3, channel, framelen, body, 0xce)
offset += 8 + framelen
- #print('FRAME: %r' % (view[:offset].tobytes(), ))
write((view[:offset], borrowed, callback))
outbound_ready()
@@ -243,17 +244,15 @@ class Buffer(object):
return self.size
-@coro
def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17,
error=socket.error, unpack_from=unpack_from):
recv_into = conn.sock.recv_into
recv = conn.sock.recv
- need = 8
+ need = [8]
buffer = Buffer(bufsize)
- while 1:
- _ = (yield) # noqa
+ def on_readable():
try:
R = recv(readsize)
bytes_read = len(R)
@@ -267,22 +266,19 @@ def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17,
buffer.write(R)
data = buffer.read()
bytes_have = len(data)
- if bytes_have >= need:
+ if bytes_have >= need[0]:
frame_offset = 0
- while bytes_have - frame_offset >= need:
+ while bytes_have - frame_offset >= need[0]:
frame_start_offset = frame_offset
frame_type, channel, size = unpack_from(
'>BHI', data, frame_offset,
)
frame_offset += 7
if bytes_have - frame_start_offset < size + 8:
- need, frame_offset = 8 + size, frame_start_offset
+ need[0], frame_offset = 8 + size, frame_start_offset
break
- print('FRAME OFFSET + SIZE: %r' % (frame_offset + size, ))
- print('OFFSET: %r SIZE: %r DATA: %r' % (buffer.offset,
- buffer.size, len(data)))
assert data[frame_offset + size] == '\xCE'
- need = 8
+ need[0] = 8
frames.append((
frame_type, channel,
data[frame_offset:frame_offset + size],
@@ -290,3 +286,4 @@ def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17,
frame_offset += size + 1
assert frame_offset == frame_start_offset + 8 + size
buffer.consume(frame_offset)
+ return on_readable
diff --git a/amqp/promise.py b/amqp/promise.py
index 2e1d337..9540813 100644
--- a/amqp/promise.py
+++ b/amqp/promise.py
@@ -295,6 +295,12 @@ class promise(object):
raise
raise exc
+ @property
+ def listeners(self):
+ if self._lvpending:
+ return self._lvpending
+ return self._svpending
+
def throw(self, exc=None):
if exc is None:
return self.set_error_state()