diff options
| author | Alan Conway <aconway@apache.org> | 2011-04-19 17:46:03 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-04-19 17:46:03 +0000 |
| commit | 29fe430cef681c18fa1b4772a0424e9eb0a14567 (patch) | |
| tree | ed8108cd1173179a7d28114e739e9b6f4c20a998 /cpp/src/tests | |
| parent | c66c6ed7c82f1d245ae958622c5f6aae7938a1d7 (diff) | |
| download | qpid-python-29fe430cef681c18fa1b4772a0424e9eb0a14567.tar.gz | |
QPID-3215: cached exchange reference can cause cluster inconsistencies if exchange is deleted/recreated
SemanticState::route() uses a simple cache variable to avoid looking
up the exchange for every message. However if the exchange in question
is deleted, even if then recreated, this can cause inconsistencies in
a cluster.
Even in a stand-alone broker messages can be routed by a deleted
exchange because of the cache.
Fix is to mark the exchange deleted and check the status when using
the cached exchange.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1095144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
| -rwxr-xr-x | cpp/src/tests/cluster_tests.py | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 73c20d451d..12f7a2ca9a 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -479,6 +479,54 @@ acl allow all all for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)]) self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)]) + def test_deleted_exchange(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + Verify stand-alone case + """ + cluster = self.cluster() + # Verify we do not route message via an exchange that has been destroyed. + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") # Should fail, exchange is deleted. + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + # FIXME aconway 2011-04-19: s0 is broken, new session + self.assert_browse(cluster[0].connect().session(), "q", ["foo"]) + + def test_deleted_exchange_inconsistent(self): + """QPID-3215: cached exchange reference can cause cluster inconsistencies + if exchange is deleted/recreated + + Verify cluster inconsistency. + """ + cluster = self.cluster() + cluster.start() + s0 = cluster[0].connect().session() + self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}") + self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}") + send0 = s0.sender("ex/foo") + send0.send("foo") + self.assert_browse(s0, "q", ["foo"]) + + cluster.start() + s1 = cluster[1].connect().session() + self.evaluate_address(s0, "ex;{delete:always}") + try: + send0.send("bar") + self.fail("Expected not-found exception") + except qpid.messaging.NotFound: pass + + self.assert_browse(s1, "q", ["foo"]) + + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |
