summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py52
2 files changed, 57 insertions, 6 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 3207a51b79..ccf25f35b5 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -513,18 +513,21 @@ class BrokerTest(TestCase):
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
actual_contents = self.browse(session, queue, timeout, transform=transform)
- self.assertEqual(expect_contents, actual_contents)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
- def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None):
"""Wait up to timeout for contents of queue to match expect_contents"""
test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
retry(test, timeout, delay)
- self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+ actual_contents = self.browse(session, queue, 0, transform=transform)
+ if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
+ self.assertEqual(expect_contents, actual_contents, msg)
def join(thread, timeout=10):
thread.join(timeout)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 822e07c702..e9d44c21e0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -100,8 +100,8 @@ class HaCluster(object):
def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-class ShortTests(BrokerTest):
- """Short HA functionality tests."""
+class HaTest(BrokerTest):
+ """Base class for HA test cases, defines convenience functions"""
# Wait for an address to become valid.
def wait(self, session, address):
@@ -135,6 +135,9 @@ class ShortTests(BrokerTest):
"""Connect to a backup broker as an admin connection"""
return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
+class ReplicationTests(HaTest):
+ """Correctness tests for HA replication."""
+
def test_replication(self):
"""Test basic replication of configuration and messages before and
after backup has connected"""
@@ -491,6 +494,51 @@ class ShortTests(BrokerTest):
# self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority)
+ def test_backup_acquired(self):
+ """Verify that acquired messages are backed up, for all queue types."""
+ class Test:
+ def __init__(self, queue, arguments, expect):
+ self.queue = queue
+ self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
+ self.queue, ",".join(arguments + ["'qpid.replicate':all"]))
+ self.expect = [str(i) for i in expect]
+
+ def send(self, connection):
+ """Send messages, then acquire one but don't acknowledge"""
+ s = connection.session()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
+
+ def wait(self, brokertest, backup):
+ brokertest.wait_backup(backup, self.queue)
+
+ def verify(self, brokertest, backup):
+ brokertest.assert_browse_backup(
+ backup, self.queue, self.expect, msg=self.queue)
+
+ tests = [
+ Test("plain",[],range(10)),
+ Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
+ Test("priority",["'qpid.priorities':10"], range(10)),
+ Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
+ Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
+ ]
+
+ primary = HaBroker(self, name="primary")
+ primary.promote()
+ backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ c = primary.connect()
+ for t in tests: t.send(c) # Send messages, leave one unacknowledged.
+
+ backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ # Wait for backups to catch up.
+ for t in tests:
+ t.wait(self, backup1)
+ t.wait(self, backup2)
+ # Verify acquired message was replicated
+ for t in tests: t.verify(self, backup1)
+ for t in tests: t.verify(self, backup2)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit