diff options
author | Marti Raudsepp <marti@juffo.org> | 2022-10-12 16:41:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-12 19:41:04 +0600 |
commit | 3a49533bc283eb03dae772ca79a6aa4f1c68f083 (patch) | |
tree | a228b2f4b0d0f82aa695dd0760906b1c6f3c45b7 | |
parent | de560081d3651844882f3401c8f66f0813043057 (diff) | |
download | kombu-3a49533bc283eb03dae772ca79a6aa4f1c68f083.tar.gz |
Add separate transport option for retry loop timeout (#1599)
* Add separate transport option for retry loop timeout
This only applies when using `Connect.default_channel`.
Before this change, the retry loop timeout was set equal to TCP connect timeout (`connect_timeout`), meaning when first connection attempt timeouted, no retry would be attempted.
Now if a new transport option `connect_total_timeout` is provided, this overrides `connect_timeout` for the retry loop (but not for TCP connect).
* Add tests
* Fix isort
* Rename to connect_retry_timeout
* Reformat
* connect_retry_timeout -> connect_retries_timeout
* Fix flake8
-rw-r--r-- | kombu/connection.py | 5 | ||||
-rw-r--r-- | t/mocks.py | 13 | ||||
-rw-r--r-- | t/unit/test_connection.py | 33 |
3 files changed, 49 insertions, 2 deletions
diff --git a/kombu/connection.py b/kombu/connection.py index 793c4d2c..50dbe10d 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -426,7 +426,7 @@ class Connection: callback (Callable): Optional callback that is called for every internal iteration (1 s). timeout (int): Maximum amount of time in seconds to spend - waiting for connection + attempting to connect, total over all retries. """ if self.connected: return self._connection @@ -867,6 +867,9 @@ class Connection: conn_opts['interval_step'] = transport_opts['interval_step'] if 'interval_max' in transport_opts: conn_opts['interval_max'] = transport_opts['interval_max'] + if 'connect_retries_timeout' in transport_opts: + conn_opts['timeout'] = \ + transport_opts['connect_retries_timeout'] return conn_opts @property @@ -1,5 +1,6 @@ from __future__ import annotations +import time from itertools import count from typing import TYPE_CHECKING from unittest.mock import Mock @@ -202,3 +203,15 @@ class Transport(base.Transport): def close_connection(self, connection): connection.connected = False + + +class TimeoutingTransport(Transport): + recoverable_connection_errors = (TimeoutError,) + + def __init__(self, connect_timeout=1, **kwargs): + self.connect_timeout = connect_timeout + super().__init__(**kwargs) + + def establish_connection(self): + time.sleep(self.connect_timeout) + raise TimeoutError('timed out') diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index b1413b7e..740bd6dc 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -11,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url from kombu.connection import Resource from kombu.exceptions import OperationalError from kombu.utils.functional import lazy -from t.mocks import Transport +from t.mocks import TimeoutingTransport, Transport class test_connection_utils: @@ -698,6 +698,37 @@ class test_Connection: with pytest.raises(OperationalError): conn.default_channel + def test_connection_failover_without_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + # Never retried, because `retry_over_time` `timeout` is equal + # to `connect_timeout` + conn._establish_connection.assert_called_once() + + def test_connection_failover_with_total_timeout(self): + with Connection( + ['server1', 'server2'], + transport=TimeoutingTransport, + connect_timeout=1, + transport_options={'connect_retries_timeout': 2, + 'interval_start': 0, 'interval_step': 0}, + ) as conn: + conn._establish_connection = Mock( + side_effect=conn._establish_connection + ) + with pytest.raises(OperationalError): + conn.default_channel + assert conn._establish_connection.call_count == 2 + class test_Connection_with_transport_options: |