diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:18 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:18 +0000 |
| commit | ba3b6c53f4072744aecbac429c8eab66631d84c6 (patch) | |
| tree | 9786da704b6f20b2f8c47e05ad1fe119d02069d3 /qpid/cpp/src/tests | |
| parent | bb13e5e60b83bc44d436d4fdf41cb56af4da7c81 (diff) | |
| download | qpid-python-ba3b6c53f4072744aecbac429c8eab66631d84c6.tar.gz | |
QPID-3603: HA primary sends membership updates to backup brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 88fb8855ba..6e270851f0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -26,19 +26,24 @@ from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG from qpidtoollibs import BrokerAgent +from uuid import UUID log = getLogger(__name__) -class QmfHaBroker(object): +class QmfAgent(object): + """Access to a QMF broker agent.""" def __init__(self, address): - self.connection = Connection.establish( + self._connection = Connection.establish( address, client_properties={"qpid.ha-admin":1}) - self.qmf = BrokerAgent(self.connection) - self.ha_broker = self.qmf.getHaBroker() - if not self.ha_broker: - raise Exception("HA module is not loaded on broker at %s"%address) + self._agent = BrokerAgent(self._connection) + assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a class HaBroker(Broker): + """Start a broker with HA enabled""" def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" @@ -58,6 +63,7 @@ class HaBroker(Broker): assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args) @@ -65,7 +71,11 @@ class HaBroker(Broker): def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status + def agent(self): + if not self._agent: self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): self.agent().getHaBroker().status # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -641,6 +651,31 @@ class ReplicationTests(BrokerTest): self.failIf(i < 0) self.assertEqual(log.find("caught up", i), -1) + def test_broker_info(self): + """Check that broker information is correctly published via management""" + cluster = HaCluster(self, 3) + + for broker in cluster: # Make sure HA system-id matches broker's + qmf = broker.agent().getHaBroker() + self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) + + cluster_ports = map(lambda b: b.port(), cluster) + cluster_ports.sort() + def ports(qmf): + qmf.update() + return sorted(map(lambda b: b["port"], qmf.members)) + # Check that all brokers have the same membership as the cluster + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + # Add a new broker, check it is updated everywhere + b = cluster.start() + cluster_ports.append(b.port()) + cluster_ports.sort() + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
