diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 11 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 52 |
2 files changed, 57 insertions, 6 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 3207a51b79..ccf25f35b5 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -513,18 +513,21 @@ class BrokerTest(TestCase): finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" actual_contents = self.browse(session, queue, timeout, transform=transform) - self.assertEqual(expect_contents, actual_contents) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) - def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content): + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg=None): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) - self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) + actual_contents = self.browse(session, queue, 0, transform=transform) + if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) + self.assertEqual(expect_contents, actual_contents, msg) def join(thread, timeout=10): thread.join(timeout) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 822e07c702..e9d44c21e0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -100,8 +100,8 @@ class HaCluster(object): def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value -class ShortTests(BrokerTest): - """Short HA functionality tests.""" +class HaTest(BrokerTest): + """Base class for HA test cases, defines convenience functions""" # Wait for an address to become valid. def wait(self, session, address): @@ -135,6 +135,9 @@ class ShortTests(BrokerTest): """Connect to a backup broker as an admin connection""" return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(HaTest): + """Correctness tests for HA replication.""" + def test_replication(self): """Test basic replication of configuration and messages before and after backup has connected""" @@ -491,6 +494,51 @@ class ShortTests(BrokerTest): # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + brokertest.wait_backup(backup, self.queue) + + def verify(self, brokertest, backup): + brokertest.assert_browse_backup( + backup, self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
