diff options
| author | Alan Conway <aconway@apache.org> | 2013-06-17 14:19:10 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-06-17 14:19:10 +0000 |
| commit | 1d55ce68b88256a4de0329d3104436b0c581000c (patch) | |
| tree | e94c576b90ee33caddd96d6efccdbf2bab51080c /qpid/cpp/src/tests | |
| parent | 0fb93ca81a3517d66339b3e890282ea4c82546a9 (diff) | |
| download | qpid-python-1d55ce68b88256a4de0329d3104436b0c581000c.tar.gz | |
QPID-4348: HA Use independent sequence numbers for identifying messages
Previously HA code used queue sequence numbers to identify messasges.
This assumes that message sequence is identical on primary and backup.
Implementing new features (for example transactions) requires that we tolerate
ordering differences between primary and backups.
This patch introduces a new, queue-scoped HA sequence number managed by the HA
plugin. The HA ID is set *before* the message is enqueued and assigned a queue
sequence number. This means it is possible to identify messages before they are
enqueued, e.g. messages in an open transaction.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1493771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/DeliveryRecordTest.cpp | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 63 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 6 |
4 files changed, 37 insertions, 38 deletions
diff --git a/qpid/cpp/src/tests/DeliveryRecordTest.cpp b/qpid/cpp/src/tests/DeliveryRecordTest.cpp index c83bd9a6a4..37b3095f81 100644 --- a/qpid/cpp/src/tests/DeliveryRecordTest.cpp +++ b/qpid/cpp/src/tests/DeliveryRecordTest.cpp @@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); + DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false); r.setId(*i); records.push_back(r); } diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 7b0d88a27c..9cf721fd01 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -107,7 +107,7 @@ class HaBroker(Broker): ha_port = ha_port or HaPort(test) args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=debug+:ha::", + "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+ # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", # Heartbeat and negotiate time are needed so that a broker wont @@ -177,7 +177,7 @@ acl allow all all self._status = self.ha_status() return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( + assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%( self, status, self._status) def wait_queue(self, queue, timeout=1): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3836381ed2..212e92b0c6 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4, UUID @@ -100,7 +100,7 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) # Verify exchange with replicate=configuration - b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) + b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. @@ -284,9 +284,9 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-ha backup.replicate(primary.host_port(), "q") - ps.send("a") + ps.send("a", timeout=1) backup.assert_browse_backup("q", ["a"]) - ps.send("b") + ps.send("b", timeout=1) backup.assert_browse_backup("q", ["a", "b"]) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -295,11 +295,11 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-config ps2 = pc.session().sender("q2;{create:always}") backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x") + ps2.send("x", timeout=1) backup.assert_browse_backup("q2", ["x"]) finally: l.restore() - def test_queue_replica_failover(self): + def test_standalone_queue_replica_failover(self): """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. @@ -319,6 +319,7 @@ class ReplicationTests(HaBrokerTest): backup.assert_browse_backup("q", ["a"]) ps.send("b") backup.assert_browse_backup("q", ["a", "b"]) + cluster[0].wait_status("ready") cluster.bounce(1) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -331,16 +332,20 @@ class ReplicationTests(HaBrokerTest): """Verify that we replicate to an LVQ correctly""" cluster = HaCluster(self, 2) s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") - def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) - for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: - send(*kv) - cluster[1].assert_browse_backup("lvq", ["b-1", "a-3", "c-2"]) - send("b","b-2") - cluster[1].assert_browse_backup("lvq", ["a-3", "c-2", "b-2"]) - send("c","c-3") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3"]) - send("d","d-1") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"]) + + def send(key,value,expect): + s.send(Message(content=value,properties={"lvq-key":key}), timeout=1) + cluster[1].assert_browse_backup("lvq", expect) + + send("a", "a-1", ["a-1"]) + send("b", "b-1", ["a-1", "b-1"]) + send("a", "a-2", ["b-1", "a-2"]) + send("a", "a-3", ["b-1", "a-3"]) + send("c", "c-1", ["b-1", "a-3", "c-1"]) + send("c", "c-2", ["b-1", "a-3", "c-2"]) + send("b", "b-2", ["a-3", "c-2", "b-2"]) + send("c", "c-3", ["a-3", "b-2", "c-3"]) + send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): """Test replication with the ring queue policy""" @@ -416,8 +421,8 @@ class ReplicationTests(HaBrokerTest): def send(self, connection): """Send messages, then acquire one but don't acknowledge""" s = connection.session() - for m in range(10): s.sender(self.address).send(str(m)) - s.receiver(self.address).fetch() + for m in range(10): s.sender(self.address).send(str(m), timeout=1) + s.receiver(self.address, timeout=1).fetch() def verify(self, brokertest, backup): backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) @@ -959,8 +964,7 @@ class LongTests(HaBrokerTest): for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() checkpoint = [ r.received+100 for r in receivers ] - dead = None - victim = random.randint(0,2) + victim = random.choice([0,1,2,primary]) # Give the primary a better chance. if victim == primary: # Don't kill primary till it is active and the next # backup is ready, otherwise we can lose messages. @@ -984,13 +988,8 @@ class LongTests(HaBrokerTest): finally: for s in senders: s.stop() for r in receivers: r.stop() - unexpected_dead = [] - for i in xrange(3): - if not brokers[i].is_running() and i != dead: - unexpected_dead.append(i) - if brokers[i].is_running(): brokers.kill(i, False) - if unexpected_dead: - raise Exception("Brokers not running: %s"%unexpected_dead) + dead = filter(lambda i: not brokers[i].is_running(), xrange(3)) + if dead: raise Exception("Brokers not running: %s"%dead) def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. @@ -1066,7 +1065,7 @@ class RecoveryTests(HaBrokerTest): # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") - for i in xrange(10): s1.send(str(i)) + for i in xrange(10): s1.send(str(i), timeout=0.1) # Kill primary and 2 backups cluster[3].wait_status("ready") @@ -1125,17 +1124,17 @@ class RecoveryTests(HaBrokerTest): cluster[0].wait_status("active") # Primary ready for b in cluster[1:3]: b.wait_status("ready") # Backups ready for i in [0,1]: cluster.kill(i, False) - cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].promote() # New primary, expected backup will 1 cluster[2].wait_status("recovering") # Should not go active till the expected backup connects or times out. self.assertEqual(cluster[2].ha_status(), "recovering") - # Messages should be held expected backup times out + # Messages should be held till expected backup times out s = cluster[2].connect().session().sender("q;{create:always}") - for i in xrange(100): s.send(str(i), sync=False) + s.send("foo", sync=False) # Verify message held initially. try: s.sync(timeout=.01); self.fail("Expected Timeout exception") except Timeout: pass - s.sync(timeout=1) # And released after the timeout. + s.sync(timeout=1) # And released after the timeout. self.assertEqual(cluster[2].ha_status(), "active") def test_join_ready_cluster(self): diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 3e6b805692..b72964c1a7 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -1,5 +1,5 @@ #!/bin/sh -# +echo# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -26,8 +26,8 @@ REPEAT="--repeat 10" QUEUES="-q 6" SENDERS="-s 3" RECEIVERS="-r 3" -BROKERS= # Local broker -CLIENT_HOSTS= # No ssh, all clients are local +BROKERS= # Local broker +CLIENT_HOSTS= # No ssh, all clients are local # Connection options TCP_NODELAY=false RECONNECT=true |
