summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
committerAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
commit3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch)
tree2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/cpp/src/tests
parentc71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff)
downloadqpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication. - qpid-ha tool can enable replication of queues. - qpid-config tool can create queues with replication enabled. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py170
1 files changed, 137 insertions, 33 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 264a636f29..18f47b17c5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,34 +24,81 @@ from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
-
+from qpidtoollibs.broker import BrokerAgent
log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args=["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=yes"]
- if broker_url: args += [ "--ha-brokers", broker_url ]
+ args = copy(args)
+ args.extend(["--load-module", BrokerTest.ha_lib,
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster])
+ if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
+ self.commands=os.getenv("PYTHON_COMMANDS")
+ assert os.path.isdir(self.commands)
def promote(self):
- assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0
+ assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
def set_client_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0
+ "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
def set_broker_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0
+ "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+
+ def replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_declare(self, queue, replication):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, **kwargs):
+ """Start a cluster of n brokers"""
+ self.test = test
+ self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ HaCluster._cluster_count += 1
+ self[0].promote()
+ self.url = ",".join([b.host_port() for b in self])
+ for b in self: b.set_broker_url(self.url)
+
+ def connect(self, i):
+ """Connect with reconnect_urls"""
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-def set_broker_urls(brokers):
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
+ def kill(self, i):
+ """Kill broker i, promote broker i+1"""
+ self[i].kill()
+ self[i].expect = EXPECT_EXIT_FAIL
+ self[(i+1) % len(self)].promote()
+
+ def bounce(self, i):
+ """Stop and restart a broker in a cluster."""
+ self.kill(i)
+ b = self[i]
+ self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+
+def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
@@ -92,6 +139,8 @@ class ShortTests(BrokerTest):
"""Test basic replication of configuration and messages before and
after backup has connected"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
@@ -177,12 +226,9 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", msgs[i+1:])
self.assert_browse_retry(b, "foo", msgs[i+1:])
- def qpid_replicate(self, value="messages"):
- return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
def test_sync(self):
def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
+ return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
@@ -206,6 +252,7 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -213,14 +260,14 @@ class ShortTests(BrokerTest):
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=990",
"--timeout=10"
])
@@ -239,7 +286,7 @@ class ShortTests(BrokerTest):
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -254,7 +301,7 @@ class ShortTests(BrokerTest):
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ sender = s.sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender.send("foo")
primary.kill()
@@ -269,7 +316,7 @@ class ShortTests(BrokerTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -288,19 +335,75 @@ class ShortTests(BrokerTest):
receiver.stop()
def test_backup_failover(self):
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["a","b","c"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ """Verify that a backup broker fails over and recovers queue state"""
+ brokers = HaCluster(self, 3)
brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+ "q;{create:always,%s}"%(qr_node())).send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
- brokers[0].kill()
- brokers[2].promote() # c must fail over to b.
- brokers[2].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[1], "q", ["a","b"])
- for b in brokers[1:]: b.kill()
+ brokers[0].expect = EXPECT_EXIT_FAIL
+ brokers.kill(0)
+ brokers[1].connect().session().sender("q").send("b")
+ self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ s = brokers[1].connect().session()
+ self.assertEqual("a", s.receiver("q").fetch().content)
+ s.acknowledge()
+ self.assert_browse_backup(brokers[2], "q", ["b"])
+
+ def test_qpid_config_replication(self):
+ """Set up replication via qpid-config"""
+ brokers = HaCluster(self,2)
+ brokers[0].config_declare("q","messages")
+ brokers[0].connect().session().sender("q").send("foo")
+ self.assert_browse_backup(brokers[1], "q", ["foo"])
+
+ def test_standalone_queue_replica(self):
+ """Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x")
+ self.assert_browse_backup(backup, "q2", ["x"])
+
+
+ def test_queue_replica_failover(self):
+ """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ pc = cluster.connect(0)
+ ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages"))
+ pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages"))
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+ backup.replicate(cluster.url, "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ cluster.bounce(0)
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ cluster.bounce(1)
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -328,6 +431,7 @@ class ShortTests(BrokerTest):
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
def test_reject(self):
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())