summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-09-06 19:36:21 +0000
committerAlan Conway <aconway@apache.org>2012-09-06 19:36:21 +0000
commit4a053938c47d097b852457ec0a32c2bdf4a3d048 (patch)
treee504c2d9deef8b95fc7bdf720611da4931ca585a /qpid/cpp/src
parent2b1afdfde49de5678619b04f85059c3a7443fe20 (diff)
downloadqpid-python-4a053938c47d097b852457ec0a32c2bdf4a3d048.tar.gz
QPID-4248: HA does not replicate topic binding keys to backups (Author: Andy Goldstein)
The issue is that QMF query responses call the binding key "bindingKey" while QMF bind events call it "key", and the code was only looking for "key" git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1381728 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py23
2 files changed, 16 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index fe32753b4e..0b6280da6d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -93,6 +93,7 @@ const string EXCHANGE("exchange");
const string EXNAME("exName");
const string EXTYPE("exType");
const string KEY("key");
+const string BINDING_KEY("bindingKey");
const string NAME("name");
const string QNAME("qName");
const string QUEUE("queue");
@@ -508,7 +509,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
- string key = values[KEY].asString();
+ string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
<< " key:" << key);
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 310ef844bd..4b0f5add01 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -256,8 +256,9 @@ class ReplicationTests(BrokerTest):
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
- def exchange(name, replicate, bindq):
- return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+ def exchange(name, replicate, bindq, key):
+ return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key)
+
def setup(p, prefix, primary):
"""Create config, send messages on the primary p"""
s = p.sender(queue(prefix+"q1", "all"))
@@ -267,15 +268,15 @@ class ReplicationTests(BrokerTest):
p.acknowledge()
p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4"))
+ p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5"))
# Test unbind
p.sender(queue(prefix+"q4", "all")).send(Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+ s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4"))
s3.send(Message("7"))
# Use old connection to unbind
us = primary.connect_old().session(str(uuid4()))
- us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
+ us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4")
p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
# Need a marker so we can wait till sync is done.
p.sender(queue(prefix+"x", "configuration"))
@@ -292,12 +293,16 @@ class ReplicationTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
assert not valid_address(b, prefix+"q3")
- b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+
+ # Verify exchange with replicate=all
+ b.sender(prefix+"e1/key1").send(Message(prefix+"e1"))
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
- b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
+
+ # Verify exchange with replicate=configuration
+ b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
- b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+ b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
primary = HaBroker(self, name="primary")