diff options
Diffstat (limited to 'oslo_messaging/_drivers')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 7 | ||||
-rw-r--r-- | oslo_messaging/_drivers/common.py | 4 | ||||
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 11 | ||||
-rw-r--r-- | oslo_messaging/_drivers/pool.py | 10 |
4 files changed, 20 insertions, 12 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index cdc21c5..6dc4f0f 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -593,9 +593,10 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND, retry=None): return rpc_common.ConnectionContext(self._connection_pool, - purpose=purpose) + purpose=purpose, + retry=retry) def _get_reply_q(self): with self._reply_q_lock: @@ -641,7 +642,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_common.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND, retry) as conn: if notify: exchange = self._get_exchange(target) LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 54c6f7f..b6c3adb 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -392,7 +392,7 @@ class ConnectionContext(Connection): If possible the function makes sure to return a connection to the pool. """ - def __init__(self, connection_pool, purpose): + def __init__(self, connection_pool, purpose, retry): """Create a new connection, or get one from the pool.""" self.connection = None self.connection_pool = connection_pool @@ -420,7 +420,7 @@ class ConnectionContext(Connection): pooled = purpose == PURPOSE_SEND if pooled: - self.connection = connection_pool.get() + self.connection = connection_pool.get(retry=retry) else: self.connection = connection_pool.create(purpose) self.pooled = pooled diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822..d603f89 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -465,13 +465,14 @@ class ConnectionLock(DummyConnectionLock): class Connection(object): """Connection object.""" - def __init__(self, conf, url, purpose): + def __init__(self, conf, url, purpose, retry=None): # NOTE(viktors): Parse config options driver_conf = conf.oslo_messaging_rabbit self.interval_start = driver_conf.rabbit_retry_interval self.interval_stepping = driver_conf.rabbit_retry_backoff self.interval_max = driver_conf.rabbit_interval_max + self.max_retries = retry self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues @@ -741,7 +742,13 @@ class Connection(object): str(exc), interval) self._set_current_channel(None) - self.connection.ensure_connection(errback=on_error) + self.connection.ensure_connection( + errback=on_error, + max_retries=self.max_retries, + interval_start=self.interval_start or 1, + interval_step=self.interval_stepping, + interval_max=self.interval_max, + ) self._set_current_channel(self.connection.channel()) self.set_transport_socket_timeout() diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index 7d29b78..2353a7a 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -80,7 +80,7 @@ class Pool(object, metaclass=abc.ABCMeta): self._items.append((ttl_watch, item)) self._cond.notify() - def get(self): + def get(self, retry=None): """Return an item from the pool, when one is available. This may cause the calling thread to block. @@ -106,7 +106,7 @@ class Pool(object, metaclass=abc.ABCMeta): # We've grabbed a slot and dropped the lock, now do the creation try: - return self.create() + return self.create(retry=retry) except Exception: with self._cond: self._current_size -= 1 @@ -122,7 +122,7 @@ class Pool(object, metaclass=abc.ABCMeta): return @abc.abstractmethod - def create(self): + def create(self, retry=None): """Construct a new item.""" @@ -141,9 +141,9 @@ class ConnectionPool(Pool): LOG.debug("Idle connection has expired and been closed." " Pool size: %d" % len(self._items)) - def create(self, purpose=common.PURPOSE_SEND): + def create(self, purpose=common.PURPOSE_SEND, retry=None): LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) + return self.connection_cls(self.conf, self.url, purpose, retry=retry) def empty(self): for item in self.iter_free(): |