summaryrefslogtreecommitdiff
path: root/kombu/transport/redis.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/transport/redis.py')
-rw-r--r--kombu/transport/redis.py22
1 files changed, 12 insertions, 10 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index 2687e9e4..80372411 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -124,7 +124,7 @@ class QoS(virtual.QoS):
def reject(self, delivery_tag, requeue=False):
if requeue:
- self.restore_by_tag(tag, leftmost=True)
+ self.restore_by_tag(delivery_tag, leftmost=True)
self.ack(delivery_tag)
@contextmanager
@@ -271,11 +271,14 @@ class MultiChannelPoller(object):
num=channel.unacked_restore_limit,
)
+ def on_readable(self, fileno):
+ chan, type = self._fd_to_chan[fileno]
+ if chan.qos.can_consume():
+ return chan.handlers[type]()
+
def handle_event(self, fileno, event):
if event & READ:
- chan, type = self._fd_to_chan[fileno]
- if chan.qos.can_consume():
- return chan.handlers[type](), self
+ return self.on_readable(fileno), self
elif event & ERR:
chan, type = self._fd_to_chan[fileno]
chan._poll_error(type)
@@ -763,19 +766,18 @@ class Transport(virtual.Transport):
cycle.on_poll_init(loop.poller)
cycle_poll_start = cycle.on_poll_start
add_reader = loop.add_reader
- handle_event = self.handle_event
+ on_readable = self.on_readable
def on_poll_start():
cycle_poll_start()
- [add_reader(fd, handle_event) for fd in cycle.fds]
+ [add_reader(fd, on_readable, fd) for fd in cycle.fds]
loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
- def handle_event(self, fileno, event):
+ def on_readable(self, fileno):
"""Handle AIO event for one of our file descriptors."""
- ret = self.cycle.handle_event(fileno, event)
- if ret:
- item, channel = ret
+ item = self.cycle.on_readable(fileno)
+ if item:
message, queue = item
if not queue or queue not in self._callbacks:
raise KeyError(