diff options
author | Ask Solem <ask@celeryproject.org> | 2014-07-22 20:18:22 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-07-22 20:18:22 +0100 |
commit | 48c5f38aad069174f629c5b9fa56fe2de56ec334 (patch) | |
tree | f0e40c7798dc2f30d7c20af1a2a7b48802cb8c6d | |
parent | 84a9b0b4fcfa53590164b6829105d839c9888a65 (diff) | |
download | kombu-callbacks.tar.gz |
More workcallbacks
-rw-r--r-- | kombu/connection.py | 3 | ||||
-rw-r--r-- | kombu/messaging.py | 31 | ||||
-rw-r--r-- | kombu/pidbox.py | 18 | ||||
-rw-r--r-- | kombu/transport/pyamqp.py | 5 | ||||
-rw-r--r-- | kombu/utils/eventio.py | 2 |
5 files changed, 35 insertions, 24 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index ce0b4fc4..8b460df2 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -139,7 +139,8 @@ class Connection(object): ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', - alternates=None, **kwargs): + alternates=None, loop=None, **kwargs): + self.loop = loop alt = [] if alternates is None else alternates # have to spell the args out, just to get nice docstrings :( params = self._initial_params = { diff --git a/kombu/messaging.py b/kombu/messaging.py index 5aea59c2..e2bd0fa3 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -217,6 +217,7 @@ class Producer(object): return callback def _publish(self, mtup): + print('PUBLISH1: %r' % (mtup, )) channel = self._channel if isinstance(channel, ChannelPromise): return self.__connection__.then(promise( @@ -251,29 +252,32 @@ class Producer(object): content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare, callback, channel): + print('>>>>>>>>>>>>>>>> PUBLISH USING CHANNEL') message = channel.prepare_message( body, priority, content_type, content_encoding, headers, properties, ) if declare: pending = list(self._declare_or_pending(declare, channel)) + print('>>>>>>>>>>>> HAVE TO DECLARE: %r' % (pending, )) if pending: return barrier(pending, promise( self._on_publish_declared, (channel, message, exchange, routing_key, mandatory, immediate, callback) )) - else: - return self._on_publish_declared( - channel, message, exchange, routing_key, - mandatory, immediate, callback, - ) + return self._on_publish_declared( + channel, message, exchange, routing_key, + mandatory, immediate, callback, + ) def _on_entity_declared(self, entity): + print('DECLARED: %r' % (entity, )) self._declaring.pop(hash(entity), None) def _on_publish_declared(self, channel, message, exchange, routing_key, mandatory, immediate, callback): + print('ACTUAL PUBLISH') return channel.basic_publish( message, exchange=exchange, routing_key=routing_key, @@ -441,9 +445,6 @@ class Consumer(object): self.prefetch_global = prefetch_global self.on_ready = ensure_promise(on_ready) - if self.channel: - self._start() - def then(self, on_success, on_failure=None): return self.on_ready.then(on_success, on_failure) @@ -451,12 +452,14 @@ class Consumer(object): """Revive consumer after connection loss.""" self._on_channel_open(None, maybe_channel(channel)) - def _start(self, prefetch_size=None, - prefetch_count=None, prefetch_global=False): + def consume(self, no_ack=None): + if no_ack is not None: + self.no_ack = no_ack self._stopping = False self.channel.then(self._on_connected) def _on_connected(self, channel): + print('ON CONNECTED: %r' % (channel, )) if getattr(channel, 'channel_id', None): return self._on_channel_open(channel) return channel.default_channel.then( @@ -464,6 +467,7 @@ class Consumer(object): ) def _on_channel_open(self, channel): + print('ON CHANNEL OPEN: %r' % (channel, )) self._active_tags.clear() self.channel = channel self.queues = [queue(self.channel) @@ -476,6 +480,7 @@ class Consumer(object): return self._on_queues_declared() def _on_queues_declared(self): + print('QUEUES NOW DECLARED') if not self.no_ack and (self.prefetch_size or self.prefetch_count): return self.qos( self.prefetch_size, self.prefetch_count, self.prefetch_global, @@ -484,7 +489,8 @@ class Consumer(object): return self._on_qos_applied() def _on_qos_applied(self, consumer_=None): - self.consume(callback=self.on_ready) + print('QOS NOW APPLIED') + self._do_consume(callback=self.on_ready) def stop(self, callback=None, close_channel=False): if not self._stopping: @@ -564,7 +570,7 @@ class Consumer(object): """ return self.add_queue(Queue.from_dict(queue, **options)) - def consume(self, no_ack=None, on_sent=None, callback=None): + def _do_consume(self, no_ack=None, on_sent=None, callback=None): """Start consuming messages. Can be called multiple times, but note that while it @@ -717,6 +723,7 @@ class Consumer(object): def _basic_consume(self, queue, consumer_tag=None, no_ack=no_ack, nowait=True, on_sent=None, on_reply=None): + print('CONSUME FROM: %r' % (queue, )) tag = self._active_tags.get(queue.name) if tag is None: tag = self._add_tag(queue, consumer_tag) diff --git a/kombu/pidbox.py b/kombu/pidbox.py index 25541e1b..73b59673 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -16,7 +16,7 @@ from itertools import count from threading import local from time import time -from . import Exchange, Queue, Consumer, Producer +from . import Exchange, Queue, Connection, Consumer, Producer from .clocks import LamportClock from .common import maybe_declare, oid_from from .exceptions import InconsistencyError @@ -231,7 +231,7 @@ class Mailbox(object): def _publish_reply(self, reply, exchange, routing_key, ticket, channel=None, **opts): - chan = channel or self.connection.default_channel + chan = channel or self.connection exchange = Exchange(exchange, exchange_type='direct', delivery_mode='transient', durable=False) @@ -253,10 +253,11 @@ class Mailbox(object): message = {'method': type, 'arguments': arguments, 'destination': destination} - chan = channel or self.connection.default_channel + chan = channel or self.connection exchange = self.exchange + declare = [exchange] if reply_ticket: - maybe_declare(self.reply_queue(channel)) + declare.append(self.reply_queue) message.update(ticket=reply_ticket, reply_to={'exchange': self.reply_exchange.name, 'routing_key': self.oid}) @@ -280,7 +281,7 @@ class Mailbox(object): arguments = arguments or {} reply_ticket = reply and uuid() or None - chan = channel or self.connection.default_channel + chan = channel or self.connection # Set reply limit to number of destinations (if specified) if limit is None and destination: @@ -304,7 +305,7 @@ class Mailbox(object): channel=None, accept=None): if accept is None: accept = self.accept - chan = channel or self.connection.default_channel + chan = channel or self.connection queue = self.reply_queue consumer = Consumer(channel, [queue], accept=accept, no_ack=True) responses = [] @@ -341,7 +342,10 @@ class Mailbox(object): break return responses finally: - chan.after_reply_message_received(queue.name) + if isinstance(chan, Connection): + chan = chan._default_channel + if chan: + chan.after_reply_message_received(queue.name) def _get_exchange(self, namespace, type): return Exchange(self.exchange_fmt % namespace, diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index 2c821141..1934cb7b 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -108,6 +108,7 @@ class Transport(base.Transport): 'ssl': conninfo.ssl, 'connect_timeout': conninfo.connect_timeout, 'heartbeat': conninfo.heartbeat, + 'loop': conninfo.loop, }, **conninfo.transport_options or {}) conn = self.Connection(**opts) conn.client = self.client @@ -125,9 +126,7 @@ class Transport(base.Transport): return connection.heartbeat def register_with_event_loop(self, connection, loop): - connection.loop = loop - connection.sock.setblocking(0) - loop.add_reader(connection.sock, connection.on_readable) + connection.register_with_event_loop(loop) def heartbeat_check(self, connection, rate=2): return connection.heartbeat_tick(rate=rate) diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 6ed352d3..2019a519 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -63,7 +63,7 @@ class Poller(object): try: return self._poll(timeout) except Exception as exc: - if exc.errno != errno.EINTR: + if getattr(exc, 'errno', None) != errno.EINTR: raise |