From 300bfee5f5de83f9f114caa21968f8ad918c44a3 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 19 Apr 2011 17:46:03 +0000 Subject: QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated SemanticState::route() uses a simple cache variable to avoid looking up the exchange for every message. However if the exchange in question is deleted, even if then recreated, this can cause inconsistencies in a cluster. Even in a stand-alone broker messages can be routed by a deleted exchange because of the cache. Fix is to mark the exchange deleted and check the status when using the cached exchange. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1095144 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/cluster_tests.py | 48 +++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'qpid/cpp/src/tests') diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 73c20d451d..12f7a2ca9a 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -479,6 +479,54 @@ acl allow all all for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + def test_deleted_exchange(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + Verify stand-alone case + """ + cluster = self.cluster() + # Verify we do not route message via an exchange that has been destroyed. + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") # Should fail, exchange is deleted. + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + # FIXME aconway 2011-04-19: s0 is broken, new session + self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) + + def test_deleted_exchange_inconsistent(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + + Verify cluster inconsistency. + """ + cluster = self.cluster() + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + + cluster.start() + s1 = cluster[1].connect().session() + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + + self.assert_browse(s1, "q", ["foo"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION= is set""" def duration(self): -- cgit v1.2.1