diff options
author | Ask Solem <ask@celeryproject.org> | 2014-07-22 20:18:48 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-07-22 20:18:48 +0100 |
commit | 398a521e794831d53f2dd8594fa71046cb2df6c0 (patch) | |
tree | f58374a8b4253f0d859bd57196fb9ee9756bebc5 | |
parent | 9881734d2cd5b2f0d0f9b9df158dd2d9d20e69c7 (diff) | |
download | py-amqp-callbacks.tar.gz |
More workcallbacks
-rw-r--r-- | amqp/__init__.py | 10 | ||||
-rw-r--r-- | amqp/channel.py | 14 | ||||
-rw-r--r-- | amqp/connection.py | 92 | ||||
-rw-r--r-- | amqp/events.py | 3 | ||||
-rw-r--r-- | amqp/method_framing.py | 27 | ||||
-rw-r--r-- | amqp/promise.py | 6 |
6 files changed, 92 insertions, 60 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py index 38546e6..75d98cb 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -29,10 +29,10 @@ __docformat__ = 'restructuredtext' # # Pull in the public items from the various sub-modules # -from .basic_message import Message # noqa +from .basic_message import Message # noqa from .channel import Channel # noqa -from .connection import Connection # noqa -from .exceptions import ( # noqa +from .connection import AsynConnection, BlockingConnection, Connection # noqa +from .exceptions import ( # noqa AMQPError, ConnectionError, RecoverableConnectionError, @@ -61,9 +61,11 @@ from .exceptions import ( # noqa error_for_code, __all__ as _all_exceptions, ) -from .promise import promise, barrier, maybe_promise # noqa +from .promise import promise, barrier, maybe_promise # noqa __all__ = [ + 'AsynConnection', + 'BlockingConnection', 'Connection', 'Channel', 'Message', diff --git a/amqp/channel.py b/amqp/channel.py index 35bd77a..1a7a3ac 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -125,8 +125,6 @@ class Channel(object): if self.connection.confirm_publish: self.basic_publish = self.basic_publish_confirm - self._x_open() - def then(self, on_success, on_error=None): return self.on_open.then(on_success, on_error) @@ -181,7 +179,7 @@ class Channel(object): def _do_revive(self): self.is_open = False - self._x_open() + self.open() def close(self, reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB', on_sent=None, callback=None): @@ -367,7 +365,7 @@ class Channel(object): self.active = active self.send_method(spec.Channel.FlowOk, 'b', (active, )) - def _x_open(self): + def open(self): """Open a channel for use This method opens a virtual connection (a channel). @@ -1121,6 +1119,10 @@ class Channel(object): consumer count """ + on_ready = None + if callback: + on_ready = promise() + on_ready.then(callback) # Need the original args passed for filter p = self.send_method( spec.Queue.Declare, argsig, (0, queue, passive, durable, exclusive, auto_delete, @@ -1130,7 +1132,7 @@ class Channel(object): return p return self.maybe_wait( spec.Queue.DeclareOk, returns_tuple=True, - filter=queue_declare_ok_t, callback=callback, + filter=queue_declare_ok_t, callback=on_ready, ) def queue_delete(self, queue='', if_unused=False, if_empty=False, @@ -1551,7 +1553,6 @@ class Channel(object): be set to True in that case. """ - on_consume_ready = promise( self._on_consume_ready, (callback, on_cancel, no_ack, ), ) @@ -1716,6 +1717,7 @@ class Channel(object): The server SHOULD implement the immediate flag. """ + print('BASIC PUBLISH: %r' % (msg, )) return self.send_method( spec.Basic.Publish, argsig, (0, exchange, routing_key, mandatory, immediate), msg, diff --git a/amqp/connection.py b/amqp/connection.py index c280981..72f698d 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -59,6 +59,7 @@ LIBRARY_PROPERTIES = { } logger = get_logger(__name__) +debug = logger.debug class BaseConnection(object): @@ -145,7 +146,7 @@ class BaseConnection(object): ssl=False, connect_timeout=None, channel_max=None, frame_max=None, heartbeat=0, on_open=None, on_blocked=None, on_unblocked=None, confirm_publish=False, - on_tune_ok=None, **kwargs): + on_tune_ok=None, loop=None, **kwargs): """Create a connection to the specified host, which should be a 'host[:port]', such as 'localhost', or '1.2.3.4:5672' (defaults to 'localhost', if a port is not specified then @@ -196,6 +197,7 @@ class BaseConnection(object): self.confirm_publish = confirm_publish # Callbacks + self._blocking = deque() self.on_blocked = on_blocked self.on_unblocked = on_unblocked self.on_open = ensure_promise(on_open) @@ -216,12 +218,16 @@ class BaseConnection(object): self.transport = self.Transport(host, ssl) self.transport.connect(self._outbound, self._outbound_ready, timeout=connect_timeout) - self._frame_handler = frame_handler(self, self.on_inbound_method) + self.on_inbound_frame = frame_handler(self, self.on_inbound_method) self._frame_writer = frame_writer(self, self.transport, self._outbound) self._inbound = deque() - self._inbound_handler = inbound_handler(self, self._inbound) - self._read_frame = self._inbound_handler.send - self.on_inbound_frame = self._frame_handler.send + self._read_frame = inbound_handler(self, self._inbound) + if loop: + self._on_loop_set(loop) + self.connect() + + def _on_loop_set(self, loop): + pass def __enter__(self): return self @@ -273,6 +279,7 @@ class BaseConnection(object): 1, channel.channel_id, sig, args, content, on_sent, )) if wait: + print('WAIT ========= %r' % (self._maybe_wait, )) return self.maybe_wait(channel, wait, callback) return on_sent @@ -337,7 +344,6 @@ class BaseConnection(object): ) def _on_open_ok(self): - print('CONNECTION NOW OPEN') self.on_open(self) def FIXME(self, *args, **kwargs): @@ -385,10 +391,10 @@ class BaseConnection(object): return self.channels[channel_id] except KeyError: p = self.Channel(self, channel_id, on_open=callback) - print('CREATE CHANNEL: %r' % (p.channel_id, )) self.channels[p.channel_id] = p if callback: p.then(callback) + p.open() return p def is_alive(self): @@ -402,7 +408,7 @@ class BaseConnection(object): if not self.connected: raise ConnectionError('Connection lost') try: - self._read_frame(None) + self._read_frame() except socket.timeout: raise except (socket.error, IOError) as exc: @@ -418,20 +424,18 @@ class BaseConnection(object): type_, channel, framelen = unpack_from('>BHI', frame) if type_ == 1: method_sig = unpack_from('>HH', frame, 7) - print( + debug( 'METHOD: %r CHANNEL: %r LEN: %r' % ( METHOD_NAME_MAP[method_sig], channel, framelen), ) else: - print('FRAME: %r CHANNEL: %r LEN: %r' % ( + debug('FRAME: %r CHANNEL: %r LEN: %r' % ( type_, channel, framelen)) def on_writable(self): outbound = self._outbound if outbound: buf, borrowed, callback = outbound.popleft() - print('WRITE: BOR: %r CB: %r -> %r' % ( - borrowed, callback, len(buf))) self._debug_outgoing_frame(buf) try: bytes_sent = self.sock.send(buf) @@ -687,28 +691,20 @@ class BaseConnection(object): def server_capabilities(self): return self.server_properties.get('capabilities') or {} + def setblocking(self, blocking): + print('SETBLOCKING: %r' % (blocking, )) + if blocking: + self._maybe_wait = self._wait_block + self.sock.setblocking(1) + else: + self._maybe_wait = self._wait_async + self.sock.setblocking(0) -class AsynConnection(BaseConnection): - - def connect(self): - pass - - def _maybe_wait(self, channel, method, callback, returns_tuple, filter): + def _wait_async(self, channel, method, callback, returns_tuple, filter): return channel.events.call_on(method, callback) -Thenable.register(AsynConnection) - -class BlockingConnection(BaseConnection): - - def __init__(self, *args, **kwargs): - self._blocking = deque() - super(BlockingConnection, self).__init__(*args, **kwargs) - - def connect(self): - while not self.on_open.ready: - self.drain_events() - - def _maybe_wait(self, channel, method, callback, returns_tuple, filter): + def _wait_block(self, channel, method, callback, returns_tuple, filter): + from amqp.exceptions import METHOD_NAME_MAP with channel.events.save_and_set_pending_for(method, callback): while not callback.ready: self.drain_events() @@ -721,8 +717,11 @@ class BlockingConnection(BaseConnection): return filter(args[0]) if filter else args[0] def drain_events(self, timeout=None): + print('DRAIN EVENTS') + #import traceback + #traceback.print_stack() if timeout is None: - self._read_frame(None) + self._read_frame() while self._inbound: self.on_inbound_frame(self._inbound.popleft()) self._call_pending_blocking_callbacks() @@ -735,7 +734,7 @@ class BlockingConnection(BaseConnection): sock.settimeout(timeout) try: try: - self._read_frame(None) + self._read_frame() except SSLError as exc: # http://bugs.python.org/issue10272 if 'timed out' in str(exc): @@ -760,6 +759,31 @@ class BlockingConnection(BaseConnection): while self._blocking: callback = self._blocking.popleft() callback() + + def register_with_event_loop(self, loop): + print('REGISTER WITH EVENT LOOP: %r' % (loop, )) + self.loop = loop + self.setblocking(0) + loop.add_reader(self.sock, self.on_readable) + + +class AsynConnection(BaseConnection): + _maybe_wait = BaseConnection._wait_async + + def connect(self): + pass + + def _on_loop_set(self, loop): + self.register_with_event_loop(loop) +Thenable.register(AsynConnection) + + +class BlockingConnection(BaseConnection): + _maybe_wait = BaseConnection._wait_block + + def connect(self): + while not self.on_open.ready: + self.drain_events() Thenable.register(BlockingConnection) -Connection = AsynConnection +Connection = BlockingConnection diff --git a/amqp/events.py b/amqp/events.py index d69b577..af8b668 100644 --- a/amqp/events.py +++ b/amqp/events.py @@ -58,6 +58,8 @@ class Events(object): get_listener = self._listeners.__getitem__ reserve_pending = self._pending.pop + from amqp.exceptions import METHOD_NAME_MAP + def dispatch(method_sig, payload, content): try: amqp_method = get_method(method_sig) @@ -83,7 +85,6 @@ class Events(object): args.append(content) for callback in callbacks: - print('CALLING: %r' % (callback, )) callback(*args) return dispatch diff --git a/amqp/method_framing.py b/amqp/method_framing.py index b7e17fb..fa29721 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -46,13 +46,13 @@ _CONTENT_METHODS = frozenset([ ]) -@coro def frame_handler(connection, callback, unpack_from=unpack_from, content_methods=_CONTENT_METHODS): expected_types = defaultdict(lambda: 1) partial_messages = {} - while 1: - frame_type, channel, buf = yield + + def on_frame(frame): + frame_type, channel, buf = frame connection.bytes_recv += 1 if frame_type not in (expected_types[channel], 8): raise UnexpectedFrame( @@ -96,6 +96,8 @@ def frame_handler(connection, callback, # bytes_recv already updated pass + return on_frame + @coro def frame_writer(connection, transport, outbound, @@ -198,7 +200,6 @@ def frame_writer(connection, transport, outbound, 3, channel, framelen, body, 0xce) offset += 8 + framelen - #print('FRAME: %r' % (view[:offset].tobytes(), )) write((view[:offset], borrowed, callback)) outbound_ready() @@ -243,17 +244,15 @@ class Buffer(object): return self.size -@coro def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17, error=socket.error, unpack_from=unpack_from): recv_into = conn.sock.recv_into recv = conn.sock.recv - need = 8 + need = [8] buffer = Buffer(bufsize) - while 1: - _ = (yield) # noqa + def on_readable(): try: R = recv(readsize) bytes_read = len(R) @@ -267,22 +266,19 @@ def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17, buffer.write(R) data = buffer.read() bytes_have = len(data) - if bytes_have >= need: + if bytes_have >= need[0]: frame_offset = 0 - while bytes_have - frame_offset >= need: + while bytes_have - frame_offset >= need[0]: frame_start_offset = frame_offset frame_type, channel, size = unpack_from( '>BHI', data, frame_offset, ) frame_offset += 7 if bytes_have - frame_start_offset < size + 8: - need, frame_offset = 8 + size, frame_start_offset + need[0], frame_offset = 8 + size, frame_start_offset break - print('FRAME OFFSET + SIZE: %r' % (frame_offset + size, )) - print('OFFSET: %r SIZE: %r DATA: %r' % (buffer.offset, - buffer.size, len(data))) assert data[frame_offset + size] == '\xCE' - need = 8 + need[0] = 8 frames.append(( frame_type, channel, data[frame_offset:frame_offset + size], @@ -290,3 +286,4 @@ def inbound_handler(conn, frames, bufsize=2 ** 19, readsize=2 ** 17, frame_offset += size + 1 assert frame_offset == frame_start_offset + 8 + size buffer.consume(frame_offset) + return on_readable diff --git a/amqp/promise.py b/amqp/promise.py index 2e1d337..9540813 100644 --- a/amqp/promise.py +++ b/amqp/promise.py @@ -295,6 +295,12 @@ class promise(object): raise raise exc + @property + def listeners(self): + if self._lvpending: + return self._lvpending + return self._svpending + def throw(self, exc=None): if exc is None: return self.set_error_state() |