summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVsevolod Strukchinsky <floatdrop@users.noreply.github.com>2018-09-27 20:38:21 +0500
committerAsif Saif Uddin <auvipy@gmail.com>2018-09-27 21:38:21 +0600
commitb3dc9208837566193deda824bc67dc900c7ed9a6 (patch)
treeb66a10202f3cee13e8a755bfd6d4e0bc19ffe7cd
parent62087a67cfbae4e87718fbb917f5140aa66e8287 (diff)
downloadkombu-b3dc9208837566193deda824bc67dc900c7ed9a6.tar.gz
Fix infinity loop in create_loop (#923)
-rw-r--r--kombu/asynchronous/hub.py18
-rw-r--r--t/unit/asynchronous/test_hub.py19
2 files changed, 12 insertions, 25 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index e8091746..a8c3124f 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -3,7 +3,6 @@
from __future__ import absolute_import, unicode_literals
import errno
-import itertools
from contextlib import contextmanager
from time import sleep
from types import GeneratorType as generator # noqa
@@ -279,24 +278,15 @@ class Hub(object):
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
propagate = self.propagate_errors
- todo = self._ready
while 1:
+ todo = self._ready
+ self._ready = set()
+
for tick_callback in on_tick:
tick_callback()
- # To avoid infinite loop where one of the callables adds items
- # to self._ready (via call_soon or otherwise), we take pop only
- # N items from the ready set.
- # N represents the current number of items on the set.
- # That way if a todo adds another one to the ready set,
- # we will break early and allow execution of readers and writers.
- current_todos = len(todo)
- for _ in itertools.repeat(None, current_todos):
- if not todo:
- break
-
- item = todo.pop()
+ for item in todo:
if item:
item()
diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py
index 40b3ce5d..6659a163 100644
--- a/t/unit/asynchronous/test_hub.py
+++ b/t/unit/asynchronous/test_hub.py
@@ -508,30 +508,27 @@ class test_Hub:
assert list(hub.scheduler), [1, 2 == 3]
def test_loop__tick_callbacks(self):
- self.hub._ready = Mock(name='_ready')
- self.hub._ready.__len__ = Mock(name="_ready.__len__")
- self.hub._ready.__len__.side_effect = RuntimeError()
ticks = [Mock(name='cb1'), Mock(name='cb2')]
self.hub.on_tick = list(ticks)
- with pytest.raises(RuntimeError):
- next(self.hub.loop)
+ next(self.hub.loop)
ticks[0].assert_called_once_with()
ticks[1].assert_called_once_with()
def test_loop__todo(self):
- self.hub.fire_timers = Mock(name='fire_timers')
- self.hub.fire_timers.side_effect = RuntimeError()
- self.hub.timer = Mock(name='timer')
+ deferred = Mock(name='cb_deferred')
- callbacks = [Mock(name='cb1'), Mock(name='cb2')]
+ def defer():
+ self.hub.call_soon(deferred)
+
+ callbacks = [Mock(name='cb1', wraps=defer), Mock(name='cb2')]
for cb in callbacks:
self.hub.call_soon(cb)
self.hub._ready.add(None)
- with pytest.raises(RuntimeError):
- next(self.hub.loop)
+ next(self.hub.loop)
callbacks[0].assert_called_once_with()
callbacks[1].assert_called_once_with()
+ deferred.assert_not_called()