diff options
Diffstat (limited to 'kombu/transport/redis.py')
-rw-r--r-- | kombu/transport/redis.py | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 2687e9e4..80372411 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -124,7 +124,7 @@ class QoS(virtual.QoS): def reject(self, delivery_tag, requeue=False): if requeue: - self.restore_by_tag(tag, leftmost=True) + self.restore_by_tag(delivery_tag, leftmost=True) self.ack(delivery_tag) @contextmanager @@ -271,11 +271,14 @@ class MultiChannelPoller(object): num=channel.unacked_restore_limit, ) + def on_readable(self, fileno): + chan, type = self._fd_to_chan[fileno] + if chan.qos.can_consume(): + return chan.handlers[type]() + def handle_event(self, fileno, event): if event & READ: - chan, type = self._fd_to_chan[fileno] - if chan.qos.can_consume(): - return chan.handlers[type](), self + return self.on_readable(fileno), self elif event & ERR: chan, type = self._fd_to_chan[fileno] chan._poll_error(type) @@ -763,19 +766,18 @@ class Transport(virtual.Transport): cycle.on_poll_init(loop.poller) cycle_poll_start = cycle.on_poll_start add_reader = loop.add_reader - handle_event = self.handle_event + on_readable = self.on_readable def on_poll_start(): cycle_poll_start() - [add_reader(fd, handle_event) for fd in cycle.fds] + [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) - def handle_event(self, fileno, event): + def on_readable(self, fileno): """Handle AIO event for one of our file descriptors.""" - ret = self.cycle.handle_event(fileno, event) - if ret: - item, channel = ret + item = self.cycle.on_readable(fileno) + if item: message, queue = item if not queue or queue not in self._callbacks: raise KeyError( |