summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorozamiatin <ozamiatin@mirantis.com>2017-01-12 16:08:53 +0200
committerOleksii Zamiatin <ozamiatin@mirantis.com>2017-01-16 20:09:26 +0000
commit8424dbc7cb3f102e4a0841684c4dc14267cd3da9 (patch)
treeccc34914a2cfa7503324b1ec753a3cb745c24dc0
parentf07652ae2dc6d3b72f966c741c92c1bc349402a6 (diff)
downloadoslo-messaging-5.17.0.tar.gz
[zmq] Dynamic connections failover5.17.0
ZeroMQ manages failover automatically when socket is being connected to multiple hosts. Dynamic connections mode assumes to connect once per message so we connect only to a single host. However we cannot say for sure if this host is still alive. For failover reasons we can connect to a primary host and then add some more hosts as failover and let ZeroMQ to decide where it finally sends the message. Change-Id: I19cbb75aaea8a0b725dd6a8ff665392738e164a1
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_options.py5
3 files changed, 12 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
index 3fcada4..44d5b6a 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -66,6 +66,10 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
def _get_round_robin_host_connection(self, target, socket):
host = self.routing_table.get_round_robin_host(target)
socket.connect_to_host(host)
+ failover_hosts = self.routing_table.get_all_round_robin_hosts(target)
+ upper_bound = self.conf.oslo_messaging_zmq.zmq_failover_connections
+ for host in failover_hosts[:upper_bound]:
+ socket.connect_to_host(host)
def _get_fanout_connection(self, target, socket):
for host in self.routing_table.get_fanout_hosts(target):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 8b07fcb..a023585 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -105,7 +105,9 @@ class DealerPublisherProxyDynamic(
if not self.publishers:
raise zmq_matchmaker_base.MatchmakerUnavailable()
socket = self.sockets_manager.get_socket()
- socket.connect_to_host(random.choice(tuple(self.publishers)))
+ random.shuffle(self.publishers)
+ for publisher in self.publishers:
+ socket.connect_to_host(publisher)
return socket
def send_request(self, socket, request):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
index 98dd65a..6ced79a 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -97,6 +97,11 @@ zmq_opts = [
'means to use direct connections for direct message '
'types (ignored otherwise).'),
+ cfg.IntOpt('zmq_failover_connections', default=2,
+ help='How many additional connections to a host will be made '
+ 'for failover reasons. This option is actual only in '
+ 'dynamic connections mode.'),
+
cfg.PortOpt('rpc_zmq_min_port',
default=49153,
deprecated_group='DEFAULT',