diff options
Diffstat (limited to 'kombu/transport')
-rw-r--r-- | kombu/transport/amqplib.py | 9 | ||||
-rw-r--r-- | kombu/transport/base.py | 34 | ||||
-rw-r--r-- | kombu/transport/librabbitmq.py | 6 | ||||
-rw-r--r-- | kombu/transport/pyamqp.py | 3 | ||||
-rw-r--r-- | kombu/transport/redis.py | 22 | ||||
-rw-r--r-- | kombu/transport/zmq.py | 20 |
6 files changed, 59 insertions, 35 deletions
diff --git a/kombu/transport/amqplib.py b/kombu/transport/amqplib.py index 545dd3a6..6a1efeef 100644 --- a/kombu/transport/amqplib.py +++ b/kombu/transport/amqplib.py @@ -311,14 +311,13 @@ class Transport(base.Transport): ) channel_errors = base.Transport.channel_errors + (AMQPChannelException, ) - nb_keep_draining = True - driver_name = "amqplib" - driver_type = "amqp" + driver_name = 'amqplib' + driver_type = 'amqp' supports_ev = True def __init__(self, client, **kwargs): self.client = client - self.default_port = kwargs.get("default_port") or self.default_port + self.default_port = kwargs.get('default_port') or self.default_port def create_channel(self, connection): return connection.channel() @@ -370,7 +369,7 @@ class Transport(base.Transport): def register_with_event_loop(self, connection, loop): loop.add_reader(connection.method_reader.source.sock, - self.client.drain_nowait_all) + self.on_readable, connection, loop) @property def default_connection_params(self): diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 330d005d..8556db55 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -7,9 +7,13 @@ Base transport interface. """ from __future__ import absolute_import +import errno +import socket + from kombu.exceptions import ChannelError, ConnectionError from kombu.message import Message from kombu.utils import cached_property +from kombu.utils.compat import get_errno __all__ = ['Message', 'StdChannel', 'Management', 'Transport'] @@ -71,10 +75,6 @@ class Transport(object): #: Tuple of errors that can happen due to channel/method failure. channel_errors = (ChannelError, ) - #: For non-blocking use, an eventloop should keep - #: draining events as long as ``connection.more_to_read`` is True. - nb_keep_draining = False - #: Type of driver, can be used to separate transports #: using the AMQP protocol (driver_type: 'amqp'), #: Redis (driver_type: 'redis'), etc... @@ -90,6 +90,8 @@ class Transport(object): #: Set to true if the transport supports the AIO interface. supports_ev = False + __reader = None + def __init__(self, client, **kwargs): self.client = client @@ -120,6 +122,30 @@ class Transport(object): def verify_connection(self, connection): return True + def _reader(self, connection, timeout=socket.timeout, error=socket.error, + get_errno=get_errno, _unavail=(errno.EAGAIN, errno.EINTR)): + drain_events = connection.drain_events + while 1: + try: + yield drain_events(timeout=0) + except timeout: + break + except error as exc: + if get_errno(exc) in _unavail: + break + raise + + def on_readable(self, connection, loop): + reader = self.__reader + if reader is None: + reader = self.__reader = self._reader(connection) + try: + next(reader) + except StopIteration: + reader = self.__reader = self._reader(connection) + next(reader, None) + loop.on_tick.add(reader) + @property def default_connection_params(self): return {} diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index 54ea918f..8fe06968 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -83,11 +83,11 @@ class Transport(base.Transport): driver_name = 'librabbitmq' supports_ev = True - nb_keep_draining = True def __init__(self, client, **kwargs): self.client = client self.default_port = kwargs.get('default_port') or self.default_port + self.__reader = None def driver_version(self): return amqp.__version__ @@ -143,7 +143,9 @@ class Transport(base.Transport): return connection.connected def register_with_event_loop(self, connection, loop): - loop.add_reader(connection.fileno(), self.client.drain_nowait_all) + loop.add_reader( + connection.fileno(), self.on_readable, connection, loop, + ) def get_manager(self, *args, **kwargs): return get_manager(self.client, *args, **kwargs) diff --git a/kombu/transport/pyamqp.py b/kombu/transport/pyamqp.py index 83529568..e8bde415 100644 --- a/kombu/transport/pyamqp.py +++ b/kombu/transport/pyamqp.py @@ -71,7 +71,6 @@ class Transport(base.Transport): amqp.Connection.recoverable_connection_errors recoverable_channel_errors = amqp.Connection.recoverable_channel_errors - nb_keep_draining = True driver_name = 'py-amqp' driver_type = 'amqp' supports_heartbeats = True @@ -119,7 +118,7 @@ class Transport(base.Transport): connection.close() def register_with_event_loop(self, connection, loop): - loop.add_reader(connection.sock, self.client.drain_nowait_all) + loop.add_reader(connection.sock, self.on_readable, connection, loop) def heartbeat_check(self, connection, rate=2): return connection.heartbeat_tick(rate=rate) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 2687e9e4..80372411 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -124,7 +124,7 @@ class QoS(virtual.QoS): def reject(self, delivery_tag, requeue=False): if requeue: - self.restore_by_tag(tag, leftmost=True) + self.restore_by_tag(delivery_tag, leftmost=True) self.ack(delivery_tag) @contextmanager @@ -271,11 +271,14 @@ class MultiChannelPoller(object): num=channel.unacked_restore_limit, ) + def on_readable(self, fileno): + chan, type = self._fd_to_chan[fileno] + if chan.qos.can_consume(): + return chan.handlers[type]() + def handle_event(self, fileno, event): if event & READ: - chan, type = self._fd_to_chan[fileno] - if chan.qos.can_consume(): - return chan.handlers[type](), self + return self.on_readable(fileno), self elif event & ERR: chan, type = self._fd_to_chan[fileno] chan._poll_error(type) @@ -763,19 +766,18 @@ class Transport(virtual.Transport): cycle.on_poll_init(loop.poller) cycle_poll_start = cycle.on_poll_start add_reader = loop.add_reader - handle_event = self.handle_event + on_readable = self.on_readable def on_poll_start(): cycle_poll_start() - [add_reader(fd, handle_event) for fd in cycle.fds] + [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) - def handle_event(self, fileno, event): + def on_readable(self, fileno): """Handle AIO event for one of our file descriptors.""" - ret = self.cycle.handle_event(fileno, event) - if ret: - item, channel = ret + item = self.cycle.on_readable(fileno) + if item: message, queue = item if not queue or queue not in self._callbacks: raise KeyError( diff --git a/kombu/transport/zmq.py b/kombu/transport/zmq.py index 7a33acb0..e23b984e 100644 --- a/kombu/transport/zmq.py +++ b/kombu/transport/zmq.py @@ -71,16 +71,16 @@ class MultiChannelPoller(object): for channel in self._channels: self._register(channel) - def handle_event(self, fileno, event): + def on_readable(self, fileno): chan = self._fd_to_chan[fileno] - return (chan.drain_events(), chan) + return chan.drain_events(), chan def get(self, timeout=None): self.on_poll_start() events = self.poller.poll(timeout) - for fileno, event in events or []: - return self.handle_event(fileno, event) + for fileno, _ in events or []: + return self.on_readable(fileno) raise Empty() @@ -238,7 +238,6 @@ class Transport(virtual.Transport): supports_ev = True polling_interval = None - nb_keep_draining = True def __init__(self, *args, **kwargs): if zmq is None: @@ -253,21 +252,18 @@ class Transport(virtual.Transport): cycle = self.cycle cycle.poller = loop.poller add_reader = loop.add_reader - handle_event = self.handle_event + on_readable = self.on_readable cycle_poll_start = cycle.on_poll_start def on_poll_start(): cycle_poll_start() - [add_reader(fd, handle_event) for fd in cycle.fds] - for fd in cycle.fds: - add_reader(fd, handle_event) + [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) - def handle_event(self, fileno, event): - evt = self.cycle.handle_event(fileno, event) - self._handle_event(evt) + def on_readable(self, fileno): + self._handle_event(self.cycle.on_readable(fileno)) def drain_events(self, connection, timeout=None): more_to_read = False |