summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers')
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py7
-rw-r--r--oslo_messaging/_drivers/common.py4
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py11
-rw-r--r--oslo_messaging/_drivers/pool.py10
4 files changed, 20 insertions, 12 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index cdc21c5..6dc4f0f 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -593,9 +593,10 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
- def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+ def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None):
return rpc_common.ConnectionContext(self._connection_pool,
- purpose=purpose)
+ purpose=purpose,
+ retry=retry)
def _get_reply_q(self):
with self._reply_q_lock:
@@ -641,7 +642,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
- with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
+ with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn:
if notify:
exchange = self._get_exchange(target)
LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 54c6f7f..b6c3adb 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -392,7 +392,7 @@ class ConnectionContext(Connection):
If possible the function makes sure to return a connection to the pool.
"""
- def __init__(self, connection_pool, purpose):
+ def __init__(self, connection_pool, purpose, retry):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
@@ -420,7 +420,7 @@ class ConnectionContext(Connection):
pooled = purpose == PURPOSE_SEND
if pooled:
- self.connection = connection_pool.get()
+ self.connection = connection_pool.get(retry=retry)
else:
self.connection = connection_pool.create(purpose)
self.pooled = pooled
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 9d99822..d603f89 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock):
class Connection(object):
"""Connection object."""
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url, purpose, retry=None):
# NOTE(viktors): Parse config options
driver_conf = conf.oslo_messaging_rabbit
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.interval_max = driver_conf.rabbit_interval_max
+ self.max_retries = retry
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
@@ -741,7 +742,13 @@ class Connection(object):
str(exc), interval)
self._set_current_channel(None)
- self.connection.ensure_connection(errback=on_error)
+ self.connection.ensure_connection(
+ errback=on_error,
+ max_retries=self.max_retries,
+ interval_start=self.interval_start or 1,
+ interval_step=self.interval_stepping,
+ interval_max=self.interval_max,
+ )
self._set_current_channel(self.connection.channel())
self.set_transport_socket_timeout()
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 7d29b78..2353a7a 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -80,7 +80,7 @@ class Pool(object, metaclass=abc.ABCMeta):
self._items.append((ttl_watch, item))
self._cond.notify()
- def get(self):
+ def get(self, retry=None):
"""Return an item from the pool, when one is available.
This may cause the calling thread to block.
@@ -106,7 +106,7 @@ class Pool(object, metaclass=abc.ABCMeta):
# We've grabbed a slot and dropped the lock, now do the creation
try:
- return self.create()
+ return self.create(retry=retry)
except Exception:
with self._cond:
self._current_size -= 1
@@ -122,7 +122,7 @@ class Pool(object, metaclass=abc.ABCMeta):
return
@abc.abstractmethod
- def create(self):
+ def create(self, retry=None):
"""Construct a new item."""
@@ -141,9 +141,9 @@ class ConnectionPool(Pool):
LOG.debug("Idle connection has expired and been closed."
" Pool size: %d" % len(self._items))
- def create(self, purpose=common.PURPOSE_SEND):
+ def create(self, purpose=common.PURPOSE_SEND, retry=None):
LOG.debug('Pool creating new connection')
- return self.connection_cls(self.conf, self.url, purpose)
+ return self.connection_cls(self.conf, self.url, purpose, retry=retry)
def empty(self):
for item in self.iter_free():