summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarti Raudsepp <marti@juffo.org>2022-10-12 16:41:04 +0300
committerGitHub <noreply@github.com>2022-10-12 19:41:04 +0600
commit3a49533bc283eb03dae772ca79a6aa4f1c68f083 (patch)
treea228b2f4b0d0f82aa695dd0760906b1c6f3c45b7
parentde560081d3651844882f3401c8f66f0813043057 (diff)
downloadkombu-3a49533bc283eb03dae772ca79a6aa4f1c68f083.tar.gz
Add separate transport option for retry loop timeout (#1599)
* Add separate transport option for retry loop timeout This only applies when using `Connect.default_channel`. Before this change, the retry loop timeout was set equal to TCP connect timeout (`connect_timeout`), meaning when first connection attempt timeouted, no retry would be attempted. Now if a new transport option `connect_total_timeout` is provided, this overrides `connect_timeout` for the retry loop (but not for TCP connect). * Add tests * Fix isort * Rename to connect_retry_timeout * Reformat * connect_retry_timeout -> connect_retries_timeout * Fix flake8
-rw-r--r--kombu/connection.py5
-rw-r--r--t/mocks.py13
-rw-r--r--t/unit/test_connection.py33
3 files changed, 49 insertions, 2 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index 793c4d2c..50dbe10d 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -426,7 +426,7 @@ class Connection:
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
timeout (int): Maximum amount of time in seconds to spend
- waiting for connection
+ attempting to connect, total over all retries.
"""
if self.connected:
return self._connection
@@ -867,6 +867,9 @@ class Connection:
conn_opts['interval_step'] = transport_opts['interval_step']
if 'interval_max' in transport_opts:
conn_opts['interval_max'] = transport_opts['interval_max']
+ if 'connect_retries_timeout' in transport_opts:
+ conn_opts['timeout'] = \
+ transport_opts['connect_retries_timeout']
return conn_opts
@property
diff --git a/t/mocks.py b/t/mocks.py
index e4cfa3f7..4c99f010 100644
--- a/t/mocks.py
+++ b/t/mocks.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+import time
from itertools import count
from typing import TYPE_CHECKING
from unittest.mock import Mock
@@ -202,3 +203,15 @@ class Transport(base.Transport):
def close_connection(self, connection):
connection.connected = False
+
+
+class TimeoutingTransport(Transport):
+ recoverable_connection_errors = (TimeoutError,)
+
+ def __init__(self, connect_timeout=1, **kwargs):
+ self.connect_timeout = connect_timeout
+ super().__init__(**kwargs)
+
+ def establish_connection(self):
+ time.sleep(self.connect_timeout)
+ raise TimeoutError('timed out')
diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py
index b1413b7e..740bd6dc 100644
--- a/t/unit/test_connection.py
+++ b/t/unit/test_connection.py
@@ -11,7 +11,7 @@ from kombu import Connection, Consumer, Producer, parse_url
from kombu.connection import Resource
from kombu.exceptions import OperationalError
from kombu.utils.functional import lazy
-from t.mocks import Transport
+from t.mocks import TimeoutingTransport, Transport
class test_connection_utils:
@@ -698,6 +698,37 @@ class test_Connection:
with pytest.raises(OperationalError):
conn.default_channel
+ def test_connection_failover_without_total_timeout(self):
+ with Connection(
+ ['server1', 'server2'],
+ transport=TimeoutingTransport,
+ connect_timeout=1,
+ transport_options={'interval_start': 0, 'interval_step': 0},
+ ) as conn:
+ conn._establish_connection = Mock(
+ side_effect=conn._establish_connection
+ )
+ with pytest.raises(OperationalError):
+ conn.default_channel
+ # Never retried, because `retry_over_time` `timeout` is equal
+ # to `connect_timeout`
+ conn._establish_connection.assert_called_once()
+
+ def test_connection_failover_with_total_timeout(self):
+ with Connection(
+ ['server1', 'server2'],
+ transport=TimeoutingTransport,
+ connect_timeout=1,
+ transport_options={'connect_retries_timeout': 2,
+ 'interval_start': 0, 'interval_step': 0},
+ ) as conn:
+ conn._establish_connection = Mock(
+ side_effect=conn._establish_connection
+ )
+ with pytest.raises(OperationalError):
+ conn.default_channel
+ assert conn._establish_connection.call_count == 2
+
class test_Connection_with_transport_options: