diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
| commit | 3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch) | |
| tree | 2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/cpp/src/tests | |
| parent | c71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff) | |
| download | qpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz | |
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication.
- qpid-ha tool can enable replication of queues.
- qpid-config tool can create queues with replication enabled.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 170 |
1 files changed, 137 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 264a636f29..18f47b17c5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -24,34 +24,81 @@ from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG - +from qpidtoollibs.broker import BrokerAgent log = getLogger("qpid.ha-tests") class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, **kwargs): + def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args=["--load-module", BrokerTest.ha_lib, - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=yes"] - if broker_url: args += [ "--ha-brokers", broker_url ] + args = copy(args) + args.extend(["--load-module", BrokerTest.ha_lib, + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster]) + if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) + self.commands=os.getenv("PYTHON_COMMANDS") + assert os.path.isdir(self.commands) def promote(self): - assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0 + assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 def set_client_url(self, url): assert os.system( - "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0 + "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 def set_broker_url(self, url): assert os.system( - "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0 + "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 + + def replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_declare(self, queue, replication): + assert os.system( + "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0 + +class HaCluster(object): + _cluster_count = 0 + + def __init__(self, test, n, **kwargs): + """Start a cluster of n brokers""" + self.test = test + self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + HaCluster._cluster_count += 1 + self[0].promote() + self.url = ",".join([b.host_port() for b in self]) + for b in self: b.set_broker_url(self.url) + + def connect(self, i): + """Connect with reconnect_urls""" + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) -def set_broker_urls(brokers): - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) + def kill(self, i): + """Kill broker i, promote broker i+1""" + self[i].kill() + self[i].expect = EXPECT_EXIT_FAIL + self[(i+1) % len(self)].promote() + + def bounce(self, i): + """Stop and restart a broker in a cluster.""" + self.kill(i) + b = self[i] + self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + + +def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value class ShortTests(BrokerTest): """Short HA functionality tests.""" @@ -92,6 +139,8 @@ class ShortTests(BrokerTest): """Test basic replication of configuration and messages before and after backup has connected""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + def queue(name, replicate): return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) @@ -177,12 +226,9 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", msgs[i+1:]) self.assert_browse_retry(b, "foo", msgs[i+1:]) - def qpid_replicate(self, value="messages"): - return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value - def test_sync(self): def queue(name, replicate): - return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) + return "%s;{create:always,%s}"%(name, qr_node(replicate)) primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() @@ -206,6 +252,7 @@ class ShortTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -213,14 +260,14 @@ class ShortTests(BrokerTest): sender = self.popen( ["qpid-send", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--address", "q;{create:always,%s}"%(qr_node("messages")), "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--address", "q;{create:always,%s}"%(qr_node("messages")), "--messages=990", "--timeout=10" ]) @@ -239,7 +286,7 @@ class ShortTests(BrokerTest): def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" - getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) @@ -254,7 +301,7 @@ class ShortTests(BrokerTest): # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) + sender = s.sender("q;{create:always,%s}"%(qr_node())) self.wait_backup(backup, "q") sender.send("foo") primary.kill() @@ -269,7 +316,7 @@ class ShortTests(BrokerTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) - primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) + primary.connect().session().sender("q;{create:always,%s}"%(qr_node())) self.wait_backup(backup, "q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) @@ -288,19 +335,75 @@ class ShortTests(BrokerTest): receiver.stop() def test_backup_failover(self): - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) - for name in ["a","b","c"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() + """Verify that a backup broker fails over and recovers queue state""" + brokers = HaCluster(self, 3) brokers[0].connect().session().sender( - "q;{create:always,%s}"%(self.qpid_replicate())).send("a") + "q;{create:always,%s}"%(qr_node())).send("a") for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) - brokers[0].kill() - brokers[2].promote() # c must fail over to b. - brokers[2].connect().session().sender("q").send("b") - self.assert_browse_backup(brokers[1], "q", ["a","b"]) - for b in brokers[1:]: b.kill() + brokers[0].expect = EXPECT_EXIT_FAIL + brokers.kill(0) + brokers[1].connect().session().sender("q").send("b") + self.assert_browse_backup(brokers[2], "q", ["a","b"]) + s = brokers[1].connect().session() + self.assertEqual("a", s.receiver("q").fetch().content) + s.acknowledge() + self.assert_browse_backup(brokers[2], "q", ["b"]) + + def test_qpid_config_replication(self): + """Set up replication via qpid-config""" + brokers = HaCluster(self,2) + brokers[0].config_declare("q","messages") + brokers[0].connect().session().sender("q").send("foo") + self.assert_browse_backup(brokers[1], "q", ["foo"]) + + def test_standalone_queue_replica(self): + """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"]) + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + br = backup.connect().session().receiver("q;{create:always}") + + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + ps.send("a") + self.assert_browse_backup(backup, "q", ["a"]) + ps.send("b") + self.assert_browse_backup(backup, "q", ["a", "b"]) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + self.assert_browse_backup(backup, "q", ["b"]) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x") + self.assert_browse_backup(backup, "q2", ["x"]) + + + def test_queue_replica_failover(self): + """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + cluster = HaCluster(self, 2) + primary = cluster[0] + pc = cluster.connect(0) + ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages")) + pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages")) + backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + br = backup.connect().session().receiver("q;{create:always}") + backup.replicate(cluster.url, "q") + ps.send("a") + self.assert_browse_backup(backup, "q", ["a"]) + cluster.bounce(0) + self.assert_browse_backup(backup, "q", ["a"]) + ps.send("b") + self.assert_browse_backup(backup, "q", ["a", "b"]) + cluster.bounce(1) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + self.assert_browse_backup(backup, "q", ["b"]) def test_lvq(self): """Verify that we replicate to an LVQ correctly""" @@ -328,6 +431,7 @@ class ShortTests(BrokerTest): self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) def test_reject(self): + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) |
