summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShahar Lev <shahar_lev@hotmail.com>2022-08-24 11:32:35 +0300
committerGitHub <noreply@github.com>2022-08-24 14:32:35 +0600
commit717ad3ddc98d83a7b9e2cb7054bb92a0cd6c8d53 (patch)
treede9a71c040ede245ceb14156ec9252bd0b3bad63
parent0e57a7b3a0edde7bad7061e73f741296ce06c3c8 (diff)
downloadkombu-717ad3ddc98d83a7b9e2cb7054bb92a0cd6c8d53.tar.gz
hub: tick delay fix (#1587)
* hub: tick delay fix todo and timer callbacks can perform actions that require a tick callback to be executed right away without polling. the current order can cause issues when using single worker with no prefetch (acks late). related issue in celery: https://github.com/celery/celery/issues/7718 * add unit test for hub delay fix * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
-rw-r--r--kombu/asynchronous/hub.py7
-rw-r--r--t/unit/asynchronous/test_hub.py19
2 files changed, 22 insertions, 4 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py
index 14d25b93..e5b1163c 100644
--- a/kombu/asynchronous/hub.py
+++ b/kombu/asynchronous/hub.py
@@ -300,14 +300,15 @@ class Hub:
while 1:
todo = self._pop_ready()
- for tick_callback in on_tick:
- tick_callback()
-
for item in todo:
if item:
item()
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
+
+ for tick_callback in on_tick:
+ tick_callback()
+
# print('[[[HUB]]]: %s' % (self.repr_active(),))
if readers or writers:
to_consolidate = []
diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py
index bcbbb260..27b048b9 100644
--- a/t/unit/asynchronous/test_hub.py
+++ b/t/unit/asynchronous/test_hub.py
@@ -1,7 +1,7 @@
from __future__ import annotations
import errno
-from unittest.mock import Mock, call, patch
+from unittest.mock import ANY, Mock, call, patch
import pytest
from vine import promise
@@ -542,6 +542,23 @@ class test_Hub:
callbacks[1].assert_called_once_with()
deferred.assert_not_called()
+ def test_loop__no_todo_tick_delay(self):
+ cb = Mock(name='parent')
+ cb.todo, cb.tick, cb.poller = Mock(), Mock(), Mock()
+ cb.poller.poll.side_effect = lambda obj: ()
+ self.hub.poller = cb.poller
+ self.hub.add(2, Mock(), READ)
+ self.hub.call_soon(cb.todo)
+ self.hub.on_tick = [cb.tick]
+
+ next(self.hub.loop)
+
+ cb.assert_has_calls([
+ call.todo(),
+ call.tick(),
+ call.poller.poll(ANY),
+ ])
+
def test__pop_ready_pops_ready_items(self):
self.hub._ready.add(None)
ret = self.hub._pop_ready()