summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-07-22 20:18:22 +0100
committerAsk Solem <ask@celeryproject.org>2014-07-22 20:18:22 +0100
commit48c5f38aad069174f629c5b9fa56fe2de56ec334 (patch)
treef0e40c7798dc2f30d7c20af1a2a7b48802cb8c6d
parent84a9b0b4fcfa53590164b6829105d839c9888a65 (diff)
downloadkombu-callbacks.tar.gz
More workcallbacks
-rw-r--r--kombu/connection.py3
-rw-r--r--kombu/messaging.py31
-rw-r--r--kombu/pidbox.py18
-rw-r--r--kombu/transport/pyamqp.py5
-rw-r--r--kombu/utils/eventio.py2
5 files changed, 35 insertions, 24 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index ce0b4fc4..8b460df2 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -139,7 +139,8 @@ class Connection(object):
ssl=False, transport=None, connect_timeout=5,
transport_options=None, login_method=None, uri_prefix=None,
heartbeat=0, failover_strategy='round-robin',
- alternates=None, **kwargs):
+ alternates=None, loop=None, **kwargs):
+ self.loop = loop
alt = [] if alternates is None else alternates
# have to spell the args out, just to get nice docstrings :(
params = self._initial_params = {
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 5aea59c2..e2bd0fa3 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -217,6 +217,7 @@ class Producer(object):
return callback
def _publish(self, mtup):
+ print('PUBLISH1: %r' % (mtup, ))
channel = self._channel
if isinstance(channel, ChannelPromise):
return self.__connection__.then(promise(
@@ -251,29 +252,32 @@ class Producer(object):
content_encoding, headers, properties,
routing_key, mandatory, immediate, exchange,
declare, callback, channel):
+ print('>>>>>>>>>>>>>>>> PUBLISH USING CHANNEL')
message = channel.prepare_message(
body, priority, content_type,
content_encoding, headers, properties,
)
if declare:
pending = list(self._declare_or_pending(declare, channel))
+ print('>>>>>>>>>>>> HAVE TO DECLARE: %r' % (pending, ))
if pending:
return barrier(pending, promise(
self._on_publish_declared,
(channel, message, exchange, routing_key, mandatory,
immediate, callback)
))
- else:
- return self._on_publish_declared(
- channel, message, exchange, routing_key,
- mandatory, immediate, callback,
- )
+ return self._on_publish_declared(
+ channel, message, exchange, routing_key,
+ mandatory, immediate, callback,
+ )
def _on_entity_declared(self, entity):
+ print('DECLARED: %r' % (entity, ))
self._declaring.pop(hash(entity), None)
def _on_publish_declared(self, channel, message, exchange, routing_key,
mandatory, immediate, callback):
+ print('ACTUAL PUBLISH')
return channel.basic_publish(
message,
exchange=exchange, routing_key=routing_key,
@@ -441,9 +445,6 @@ class Consumer(object):
self.prefetch_global = prefetch_global
self.on_ready = ensure_promise(on_ready)
- if self.channel:
- self._start()
-
def then(self, on_success, on_failure=None):
return self.on_ready.then(on_success, on_failure)
@@ -451,12 +452,14 @@ class Consumer(object):
"""Revive consumer after connection loss."""
self._on_channel_open(None, maybe_channel(channel))
- def _start(self, prefetch_size=None,
- prefetch_count=None, prefetch_global=False):
+ def consume(self, no_ack=None):
+ if no_ack is not None:
+ self.no_ack = no_ack
self._stopping = False
self.channel.then(self._on_connected)
def _on_connected(self, channel):
+ print('ON CONNECTED: %r' % (channel, ))
if getattr(channel, 'channel_id', None):
return self._on_channel_open(channel)
return channel.default_channel.then(
@@ -464,6 +467,7 @@ class Consumer(object):
)
def _on_channel_open(self, channel):
+ print('ON CHANNEL OPEN: %r' % (channel, ))
self._active_tags.clear()
self.channel = channel
self.queues = [queue(self.channel)
@@ -476,6 +480,7 @@ class Consumer(object):
return self._on_queues_declared()
def _on_queues_declared(self):
+ print('QUEUES NOW DECLARED')
if not self.no_ack and (self.prefetch_size or self.prefetch_count):
return self.qos(
self.prefetch_size, self.prefetch_count, self.prefetch_global,
@@ -484,7 +489,8 @@ class Consumer(object):
return self._on_qos_applied()
def _on_qos_applied(self, consumer_=None):
- self.consume(callback=self.on_ready)
+ print('QOS NOW APPLIED')
+ self._do_consume(callback=self.on_ready)
def stop(self, callback=None, close_channel=False):
if not self._stopping:
@@ -564,7 +570,7 @@ class Consumer(object):
"""
return self.add_queue(Queue.from_dict(queue, **options))
- def consume(self, no_ack=None, on_sent=None, callback=None):
+ def _do_consume(self, no_ack=None, on_sent=None, callback=None):
"""Start consuming messages.
Can be called multiple times, but note that while it
@@ -717,6 +723,7 @@ class Consumer(object):
def _basic_consume(self, queue, consumer_tag=None, no_ack=no_ack,
nowait=True, on_sent=None, on_reply=None):
+ print('CONSUME FROM: %r' % (queue, ))
tag = self._active_tags.get(queue.name)
if tag is None:
tag = self._add_tag(queue, consumer_tag)
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index 25541e1b..73b59673 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -16,7 +16,7 @@ from itertools import count
from threading import local
from time import time
-from . import Exchange, Queue, Consumer, Producer
+from . import Exchange, Queue, Connection, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
from .exceptions import InconsistencyError
@@ -231,7 +231,7 @@ class Mailbox(object):
def _publish_reply(self, reply, exchange, routing_key, ticket,
channel=None, **opts):
- chan = channel or self.connection.default_channel
+ chan = channel or self.connection
exchange = Exchange(exchange, exchange_type='direct',
delivery_mode='transient',
durable=False)
@@ -253,10 +253,11 @@ class Mailbox(object):
message = {'method': type,
'arguments': arguments,
'destination': destination}
- chan = channel or self.connection.default_channel
+ chan = channel or self.connection
exchange = self.exchange
+ declare = [exchange]
if reply_ticket:
- maybe_declare(self.reply_queue(channel))
+ declare.append(self.reply_queue)
message.update(ticket=reply_ticket,
reply_to={'exchange': self.reply_exchange.name,
'routing_key': self.oid})
@@ -280,7 +281,7 @@ class Mailbox(object):
arguments = arguments or {}
reply_ticket = reply and uuid() or None
- chan = channel or self.connection.default_channel
+ chan = channel or self.connection
# Set reply limit to number of destinations (if specified)
if limit is None and destination:
@@ -304,7 +305,7 @@ class Mailbox(object):
channel=None, accept=None):
if accept is None:
accept = self.accept
- chan = channel or self.connection.default_channel
+ chan = channel or self.connection
queue = self.reply_queue
consumer = Consumer(channel, [queue], accept=accept, no_ack=True)
responses = []
@@ -341,7 +342,10 @@ class Mailbox(object):
break
return responses
finally:
- chan.after_reply_message_received(queue.name)
+ if isinstance(chan, Connection):
+ chan = chan._default_channel
+ if chan:
+ chan.after_reply_message_received(queue.name)
def _get_exchange(self, namespace, type):
return Exchange(self.exchange_fmt % namespace,
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index 2c821141..1934cb7b 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -108,6 +108,7 @@ class Transport(base.Transport):
'ssl': conninfo.ssl,
'connect_timeout': conninfo.connect_timeout,
'heartbeat': conninfo.heartbeat,
+ 'loop': conninfo.loop,
}, **conninfo.transport_options or {})
conn = self.Connection(**opts)
conn.client = self.client
@@ -125,9 +126,7 @@ class Transport(base.Transport):
return connection.heartbeat
def register_with_event_loop(self, connection, loop):
- connection.loop = loop
- connection.sock.setblocking(0)
- loop.add_reader(connection.sock, connection.on_readable)
+ connection.register_with_event_loop(loop)
def heartbeat_check(self, connection, rate=2):
return connection.heartbeat_tick(rate=rate)
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 6ed352d3..2019a519 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -63,7 +63,7 @@ class Poller(object):
try:
return self._poll(timeout)
except Exception as exc:
- if exc.errno != errno.EINTR:
+ if getattr(exc, 'errno', None) != errno.EINTR:
raise