summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2015-09-29 15:24:57 -0700
committerAsk Solem <ask@celeryproject.org>2015-09-29 15:24:57 -0700
commit7057ca1d61fd48705ea678121729c292e64ea548 (patch)
tree8bf8c9079261dbe10e41a930c370d58d478f10c3
parentcdcc1cf420990221acd84c90a1308ec266d83b91 (diff)
downloadkombu-7057ca1d61fd48705ea678121729c292e64ea548.tar.gz
Redis: Precedence bug
-rw-r--r--kombu/transport/redis.py4
-rw-r--r--kombu/utils/eventio.py2
2 files changed, 2 insertions, 4 deletions
diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py
index d8cd2332..8d3dcb77 100644
--- a/kombu/transport/redis.py
+++ b/kombu/transport/redis.py
@@ -942,8 +942,7 @@ class Transport(virtual.Transport):
def on_poll_start():
cycle_poll_start()
[add_reader(fd, on_readable, fd) for fd in cycle.fds]
- self.on_poll_start = on_poll_start
- loop.call_soon(on_poll_start)
+ loop.on_tick.add(on_poll_start)
loop.call_repeatedly(10, cycle.maybe_restore_messages)
def on_readable(self, fileno):
@@ -956,7 +955,6 @@ class Transport(virtual.Transport):
'Message for queue {0!r} without consumers: {1}'.format(
queue, message))
self._callbacks[queue](message)
- self.on_poll_start()
def _get_errors(self):
"""Utility to import redis-py's exceptions at runtime."""
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index a441e150..3d59d993 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -216,7 +216,7 @@ class _poll(object):
def poll(self, timeout, round=math.ceil,
POLLIN=POLLIN, POLLOUT=POLLOUT, POLLERR=POLLERR,
READ=READ, WRITE=WRITE, ERR=ERR, Integral=Integral):
- timeout = 0 if timeout and timeout < 0 else round(timeout or 0 * 1e3)
+ timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3)
try:
event_list = self._quick_poll(timeout)
except (_selecterr, socket.error) as exc: