diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 63 |
1 files changed, 31 insertions, 32 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3836381ed2..212e92b0c6 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4, UUID @@ -100,7 +100,7 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) # Verify exchange with replicate=configuration - b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) + b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. @@ -284,9 +284,9 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-ha backup.replicate(primary.host_port(), "q") - ps.send("a") + ps.send("a", timeout=1) backup.assert_browse_backup("q", ["a"]) - ps.send("b") + ps.send("b", timeout=1) backup.assert_browse_backup("q", ["a", "b"]) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -295,11 +295,11 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-config ps2 = pc.session().sender("q2;{create:always}") backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x") + ps2.send("x", timeout=1) backup.assert_browse_backup("q2", ["x"]) finally: l.restore() - def test_queue_replica_failover(self): + def test_standalone_queue_replica_failover(self): """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. @@ -319,6 +319,7 @@ class ReplicationTests(HaBrokerTest): backup.assert_browse_backup("q", ["a"]) ps.send("b") backup.assert_browse_backup("q", ["a", "b"]) + cluster[0].wait_status("ready") cluster.bounce(1) self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() @@ -331,16 +332,20 @@ class ReplicationTests(HaBrokerTest): """Verify that we replicate to an LVQ correctly""" cluster = HaCluster(self, 2) s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") - def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) - for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: - send(*kv) - cluster[1].assert_browse_backup("lvq", ["b-1", "a-3", "c-2"]) - send("b","b-2") - cluster[1].assert_browse_backup("lvq", ["a-3", "c-2", "b-2"]) - send("c","c-3") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3"]) - send("d","d-1") - cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"]) + + def send(key,value,expect): + s.send(Message(content=value,properties={"lvq-key":key}), timeout=1) + cluster[1].assert_browse_backup("lvq", expect) + + send("a", "a-1", ["a-1"]) + send("b", "b-1", ["a-1", "b-1"]) + send("a", "a-2", ["b-1", "a-2"]) + send("a", "a-3", ["b-1", "a-3"]) + send("c", "c-1", ["b-1", "a-3", "c-1"]) + send("c", "c-2", ["b-1", "a-3", "c-2"]) + send("b", "b-2", ["a-3", "c-2", "b-2"]) + send("c", "c-3", ["a-3", "b-2", "c-3"]) + send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"]) def test_ring(self): """Test replication with the ring queue policy""" @@ -416,8 +421,8 @@ class ReplicationTests(HaBrokerTest): def send(self, connection): """Send messages, then acquire one but don't acknowledge""" s = connection.session() - for m in range(10): s.sender(self.address).send(str(m)) - s.receiver(self.address).fetch() + for m in range(10): s.sender(self.address).send(str(m), timeout=1) + s.receiver(self.address, timeout=1).fetch() def verify(self, brokertest, backup): backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) @@ -959,8 +964,7 @@ class LongTests(HaBrokerTest): for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() checkpoint = [ r.received+100 for r in receivers ] - dead = None - victim = random.randint(0,2) + victim = random.choice([0,1,2,primary]) # Give the primary a better chance. if victim == primary: # Don't kill primary till it is active and the next # backup is ready, otherwise we can lose messages. @@ -984,13 +988,8 @@ class LongTests(HaBrokerTest): finally: for s in senders: s.stop() for r in receivers: r.stop() - unexpected_dead = [] - for i in xrange(3): - if not brokers[i].is_running() and i != dead: - unexpected_dead.append(i) - if brokers[i].is_running(): brokers.kill(i, False) - if unexpected_dead: - raise Exception("Brokers not running: %s"%unexpected_dead) + dead = filter(lambda i: not brokers[i].is_running(), xrange(3)) + if dead: raise Exception("Brokers not running: %s"%dead) def test_qmf_order(self): """QPID 4402: HA QMF events can be out of order. @@ -1066,7 +1065,7 @@ class RecoveryTests(HaBrokerTest): # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") - for i in xrange(10): s1.send(str(i)) + for i in xrange(10): s1.send(str(i), timeout=0.1) # Kill primary and 2 backups cluster[3].wait_status("ready") @@ -1125,17 +1124,17 @@ class RecoveryTests(HaBrokerTest): cluster[0].wait_status("active") # Primary ready for b in cluster[1:3]: b.wait_status("ready") # Backups ready for i in [0,1]: cluster.kill(i, False) - cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].promote() # New primary, expected backup will 1 cluster[2].wait_status("recovering") # Should not go active till the expected backup connects or times out. self.assertEqual(cluster[2].ha_status(), "recovering") - # Messages should be held expected backup times out + # Messages should be held till expected backup times out s = cluster[2].connect().session().sender("q;{create:always}") - for i in xrange(100): s.send(str(i), sync=False) + s.send("foo", sync=False) # Verify message held initially. try: s.sync(timeout=.01); self.fail("Expected Timeout exception") except Timeout: pass - s.sync(timeout=1) # And released after the timeout. + s.sync(timeout=1) # And released after the timeout. self.assertEqual(cluster[2].ha_status(), "active") def test_join_ready_cluster(self): |
