From e54ef8dc737196343ad974c91a86681efca5fb14 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 30 Mar 2012 19:36:48 +0000 Subject: QPID-3603: Keep acquired messages on queues for all queue types. Updated priority and lvq queues to keep acquired messages, and supply them to browsers if requested. This is necessary so replicating subscriptions can back-up these queue types without message loss. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1307582 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/ha_tests.py | 52 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src/tests/ha_tests.py') diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 822e07c702..e9d44c21e0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -100,8 +100,8 @@ class HaCluster(object): def qr_node(value="all"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value -class ShortTests(BrokerTest): - """Short HA functionality tests.""" +class HaTest(BrokerTest): + """Base class for HA test cases, defines convenience functions""" # Wait for an address to become valid. def wait(self, session, address): @@ -135,6 +135,9 @@ class ShortTests(BrokerTest): """Connect to a backup broker as an admin connection""" return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs) +class ReplicationTests(HaTest): + """Correctness tests for HA replication.""" + def test_replication(self): """Test basic replication of configuration and messages before and after backup has connected""" @@ -491,6 +494,51 @@ class ShortTests(BrokerTest): # self.assert_browse_backup(backup, "q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda m: m.priority) + def test_backup_acquired(self): + """Verify that acquired messages are backed up, for all queue types.""" + class Test: + def __init__(self, queue, arguments, expect): + self.queue = queue + self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%( + self.queue, ",".join(arguments + ["'qpid.replicate':all"])) + self.expect = [str(i) for i in expect] + + def send(self, connection): + """Send messages, then acquire one but don't acknowledge""" + s = connection.session() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() + + def wait(self, brokertest, backup): + brokertest.wait_backup(backup, self.queue) + + def verify(self, brokertest, backup): + brokertest.assert_browse_backup( + backup, self.queue, self.expect, msg=self.queue) + + tests = [ + Test("plain",[],range(10)), + Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)), + Test("priority",["'qpid.priorities':10"], range(10)), + Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)), + Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9]) + ] + + primary = HaBroker(self, name="primary") + primary.promote() + backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + c = primary.connect() + for t in tests: t.send(c) # Send messages, leave one unacknowledged. + + backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + # Wait for backups to catch up. + for t in tests: + t.wait(self, backup1) + t.wait(self, backup2) + # Verify acquired message was replicated + for t in tests: t.verify(self, backup1) + for t in tests: t.verify(self, backup2) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit -- cgit v1.2.1