summaryrefslogtreecommitdiff
path: root/kombu/transport
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport')
-rw-r--r--kombu/transport/amqplib.py9
-rw-r--r--kombu/transport/base.py34
-rw-r--r--kombu/transport/librabbitmq.py6
-rw-r--r--kombu/transport/pyamqp.py3
-rw-r--r--kombu/transport/redis.py22
-rw-r--r--kombu/transport/zmq.py20
6 files changed, 59 insertions, 35 deletions
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py
index 545dd3a6..6a1efeef 100644
--- a/kombu/transport/amqplib.py
+++ b/kombu/transport/amqplib.py
@@ -311,14 +311,13 @@ class Transport(base.Transport):
)
channel_errors = base.Transport.channel_errors + (AMQPChannelException, )
- nb_keep_draining = True
- driver_name = "amqplib"
- driver_type = "amqp"
+ driver_name = 'amqplib'
+ driver_type = 'amqp'
supports_ev = True
def __init__(self, client, **kwargs):
self.client = client
- self.default_port = kwargs.get("default_port") or self.default_port
+ self.default_port = kwargs.get('default_port') or self.default_port
def create_channel(self, connection):
return connection.channel()
@@ -370,7 +369,7 @@ class Transport(base.Transport):
def register_with_event_loop(self, connection, loop):
loop.add_reader(connection.method_reader.source.sock,
- self.client.drain_nowait_all)
+ self.on_readable, connection, loop)
@property
def default_connection_params(self):
diff --git a/kombu/transport/base.py b/kombu/transport/base.py
index 330d005d..8556db55 100644
--- a/kombu/transport/base.py
+++ b/kombu/transport/base.py
@@ -7,9 +7,13 @@ Base transport interface.
"""
from __future__ import absolute_import
+import errno
+import socket
+
from kombu.exceptions import ChannelError, ConnectionError
from kombu.message import Message
from kombu.utils import cached_property
+from kombu.utils.compat import get_errno
__all__ = ['Message', 'StdChannel', 'Management', 'Transport']
@@ -71,10 +75,6 @@ class Transport(object):
#: Tuple of errors that can happen due to channel/method failure.
channel_errors = (ChannelError, )
- #: For non-blocking use, an eventloop should keep
- #: draining events as long as ``connection.more_to_read`` is True.
- nb_keep_draining = False
-
#: Type of driver, can be used to separate transports
#: using the AMQP protocol (driver_type: 'amqp'),
#: Redis (driver_type: 'redis'), etc...
@@ -90,6 +90,8 @@ class Transport(object):
#: Set to true if the transport supports the AIO interface.
supports_ev = False
+ __reader = None
+
def __init__(self, client, **kwargs):
self.client = client
@@ -120,6 +122,30 @@ class Transport(object):
def verify_connection(self, connection):
return True
+ def _reader(self, connection, timeout=socket.timeout, error=socket.error,
+ get_errno=get_errno, _unavail=(errno.EAGAIN, errno.EINTR)):
+ drain_events = connection.drain_events
+ while 1:
+ try:
+ yield drain_events(timeout=0)
+ except timeout:
+ break
+ except error as exc:
+ if get_errno(exc) in _unavail:
+ break
+ raise
+
+ def on_readable(self, connection, loop):
+ reader = self.__reader
+ if reader is None:
+ reader = self.__reader = self._reader(connection)
+ try:
+ next(reader)
+ except StopIteration:
+ reader = self.__reader = self._reader(connection)
+ next(reader, None)
+ loop.on_tick.add(reader)
+
@property
def default_connection_params(self):
return {}
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index 54ea918f..8fe06968 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -83,11 +83,11 @@ class Transport(base.Transport):
driver_name = 'librabbitmq'
supports_ev = True
- nb_keep_draining = True
def __init__(self, client, **kwargs):
self.client = client
self.default_port = kwargs.get('default_port') or self.default_port
+ self.__reader = None
def driver_version(self):
return amqp.__version__
@@ -143,7 +143,9 @@ class Transport(base.Transport):
return connection.connected
def register_with_event_loop(self, connection, loop):
- loop.add_reader(connection.fileno(), self.client.drain_nowait_all)
+ loop.add_reader(
+ connection.fileno(), self.on_readable, connection, loop,
+ )
def get_manager(self, *args, **kwargs):
return get_manager(self.client, *args, **kwargs)
diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py
index 83529568..e8bde415 100644
--- a/kombu/transport/pyamqp.py
+++ b/kombu/transport/pyamqp.py
@@ -71,7 +71,6 @@ class Transport(base.Transport):
amqp.Connection.recoverable_connection_errors
recoverable_channel_errors = amqp.Connection.recoverable_channel_errors
- nb_keep_draining = True
driver_name = 'py-amqp'
driver_type = 'amqp'
supports_heartbeats = True
@@ -119,7 +118,7 @@ class Transport(base.Transport):
connection.close()
def register_with_event_loop(self, connection, loop):
- loop.add_reader(connection.sock, self.client.drain_nowait_all)
+ loop.add_reader(connection.sock, self.on_readable, connection, loop)
def heartbeat_check(self, connection, rate=2):
return connection.heartbeat_tick(rate=rate)
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 2687e9e4..80372411 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -124,7 +124,7 @@ class QoS(virtual.QoS):
def reject(self, delivery_tag, requeue=False):
if requeue:
- self.restore_by_tag(tag, leftmost=True)
+ self.restore_by_tag(delivery_tag, leftmost=True)
self.ack(delivery_tag)
@contextmanager
@@ -271,11 +271,14 @@ class MultiChannelPoller(object):
num=channel.unacked_restore_limit,
)
+ def on_readable(self, fileno):
+ chan, type = self._fd_to_chan[fileno]
+ if chan.qos.can_consume():
+ return chan.handlers[type]()
+
def handle_event(self, fileno, event):
if event & READ:
- chan, type = self._fd_to_chan[fileno]
- if chan.qos.can_consume():
- return chan.handlers[type](), self
+ return self.on_readable(fileno), self
elif event & ERR:
chan, type = self._fd_to_chan[fileno]
chan._poll_error(type)
@@ -763,19 +766,18 @@ class Transport(virtual.Transport):
cycle.on_poll_init(loop.poller)
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
- handle_event = self.handle_event
+ on_readable = self.on_readable
def on_poll_start():
cycle_poll_start()
- [add_reader(fd, handle_event) for fd in cycle.fds]
+ [add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
- def handle_event(self, fileno, event):
+ def on_readable(self, fileno):
"""Handle AIO event for one of our file descriptors."""
- ret = self.cycle.handle_event(fileno, event)
- if ret:
- item, channel = ret
+ item = self.cycle.on_readable(fileno)
+ if item:
message, queue = item
if not queue or queue not in self._callbacks:
raise KeyError(
diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py
index 7a33acb0..e23b984e 100644
--- a/kombu/transport/zmq.py
+++ b/kombu/transport/zmq.py
@@ -71,16 +71,16 @@ class MultiChannelPoller(object):
for channel in self._channels:
self._register(channel)
- def handle_event(self, fileno, event):
+ def on_readable(self, fileno):
chan = self._fd_to_chan[fileno]
- return (chan.drain_events(), chan)
+ return chan.drain_events(), chan
def get(self, timeout=None):
self.on_poll_start()
events = self.poller.poll(timeout)
- for fileno, event in events or []:
- return self.handle_event(fileno, event)
+ for fileno, _ in events or []:
+ return self.on_readable(fileno)
raise Empty()
@@ -238,7 +238,6 @@ class Transport(virtual.Transport):
supports_ev = True
polling_interval = None
- nb_keep_draining = True
def __init__(self, *args, **kwargs):
if zmq is None:
@@ -253,21 +252,18 @@ class Transport(virtual.Transport):
cycle = self.cycle
cycle.poller = loop.poller
add_reader = loop.add_reader
- handle_event = self.handle_event
+ on_readable = self.on_readable
cycle_poll_start = cycle.on_poll_start
def on_poll_start():
cycle_poll_start()
- [add_reader(fd, handle_event) for fd in cycle.fds]
- for fd in cycle.fds:
- add_reader(fd, handle_event)
+ [add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
- def handle_event(self, fileno, event):
- evt = self.cycle.handle_event(fileno, event)
- self._handle_event(evt)
+ def on_readable(self, fileno):
+ self._handle_event(self.cycle.on_readable(fileno))
def drain_events(self, connection, timeout=None):
more_to_read = False