summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-08 15:24:18 +0000
committerAlan Conway <aconway@apache.org>2012-06-08 15:24:18 +0000
commitba3b6c53f4072744aecbac429c8eab66631d84c6 (patch)
tree9786da704b6f20b2f8c47e05ad1fe119d02069d3 /qpid/cpp/src/tests
parentbb13e5e60b83bc44d436d4fdf41cb56af4da7c81 (diff)
downloadqpid-python-ba3b6c53f4072744aecbac429c8eab66631d84c6.tar.gz
QPID-3603: HA primary sends membership updates to backup brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py49
1 files changed, 42 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 88fb8855ba..6e270851f0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -26,19 +26,24 @@ from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
from qpidtoollibs import BrokerAgent
+from uuid import UUID
log = getLogger(__name__)
-class QmfHaBroker(object):
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
def __init__(self, address):
- self.connection = Connection.establish(
+ self._connection = Connection.establish(
address, client_properties={"qpid.ha-admin":1})
- self.qmf = BrokerAgent(self.connection)
- self.ha_broker = self.qmf.getHaBroker()
- if not self.ha_broker:
- raise Exception("HA module is not loaded on broker at %s"%address)
+ self._agent = BrokerAgent(self._connection)
+ assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
class HaBroker(Broker):
+ """Start a broker with HA enabled"""
def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
@@ -58,6 +63,7 @@ class HaBroker(Broker):
assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
@@ -65,7 +71,11 @@ class HaBroker(Broker):
def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
- def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status
+ def agent(self):
+ if not self._agent: self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self): self.agent().getHaBroker().status
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -641,6 +651,31 @@ class ReplicationTests(BrokerTest):
self.failIf(i < 0)
self.assertEqual(log.find("caught up", i), -1)
+ def test_broker_info(self):
+ """Check that broker information is correctly published via management"""
+ cluster = HaCluster(self, 3)
+
+ for broker in cluster: # Make sure HA system-id matches broker's
+ qmf = broker.agent().getHaBroker()
+ self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+ cluster_ports = map(lambda b: b.port(), cluster)
+ cluster_ports.sort()
+ def ports(qmf):
+ qmf.update()
+ return sorted(map(lambda b: b["port"], qmf.members))
+ # Check that all brokers have the same membership as the cluster
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ # Add a new broker, check it is updated everywhere
+ b = cluster.start()
+ cluster_ports.append(b.port())
+ cluster_ports.sort()
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit