diff options
author | Shahar Lev <shahar_lev@hotmail.com> | 2022-08-24 11:32:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-24 14:32:35 +0600 |
commit | 717ad3ddc98d83a7b9e2cb7054bb92a0cd6c8d53 (patch) | |
tree | de9a71c040ede245ceb14156ec9252bd0b3bad63 | |
parent | 0e57a7b3a0edde7bad7061e73f741296ce06c3c8 (diff) | |
download | kombu-717ad3ddc98d83a7b9e2cb7054bb92a0cd6c8d53.tar.gz |
hub: tick delay fix (#1587)
* hub: tick delay fix
todo and timer callbacks can perform actions that
require a tick callback to be executed right away
without polling.
the current order can cause issues
when using single worker with no prefetch (acks late).
related issue in celery:
https://github.com/celery/celery/issues/7718
* add unit test for hub delay fix
* [pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
-rw-r--r-- | kombu/asynchronous/hub.py | 7 | ||||
-rw-r--r-- | t/unit/asynchronous/test_hub.py | 19 |
2 files changed, 22 insertions, 4 deletions
diff --git a/kombu/asynchronous/hub.py b/kombu/asynchronous/hub.py index 14d25b93..e5b1163c 100644 --- a/kombu/asynchronous/hub.py +++ b/kombu/asynchronous/hub.py @@ -300,14 +300,15 @@ class Hub: while 1: todo = self._pop_ready() - for tick_callback in on_tick: - tick_callback() - for item in todo: if item: item() poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 + + for tick_callback in on_tick: + tick_callback() + # print('[[[HUB]]]: %s' % (self.repr_active(),)) if readers or writers: to_consolidate = [] diff --git a/t/unit/asynchronous/test_hub.py b/t/unit/asynchronous/test_hub.py index bcbbb260..27b048b9 100644 --- a/t/unit/asynchronous/test_hub.py +++ b/t/unit/asynchronous/test_hub.py @@ -1,7 +1,7 @@ from __future__ import annotations import errno -from unittest.mock import Mock, call, patch +from unittest.mock import ANY, Mock, call, patch import pytest from vine import promise @@ -542,6 +542,23 @@ class test_Hub: callbacks[1].assert_called_once_with() deferred.assert_not_called() + def test_loop__no_todo_tick_delay(self): + cb = Mock(name='parent') + cb.todo, cb.tick, cb.poller = Mock(), Mock(), Mock() + cb.poller.poll.side_effect = lambda obj: () + self.hub.poller = cb.poller + self.hub.add(2, Mock(), READ) + self.hub.call_soon(cb.todo) + self.hub.on_tick = [cb.tick] + + next(self.hub.loop) + + cb.assert_has_calls([ + call.todo(), + call.tick(), + call.poller.poll(ANY), + ]) + def test__pop_ready_pops_ready_items(self): self.hub._ready.add(None) ret = self.hub._pop_ready() |