summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp59
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp40
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h3
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h2
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h26
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py18
11 files changed, 109 insertions, 101 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0f4c5b2be8..410ebc3114 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -242,12 +242,14 @@ class BrokerReplicator::UpdateTracker {
/** Add an exchange name */
void addExchange(Exchange::shared_ptr ex) {
- if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName());
+ if (repTest.getLevel(*ex))
+ initial.insert(ex->getName());
}
/** Add a queue name. */
void addQueue(Queue::shared_ptr q) {
- if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName());
+ if (repTest.getLevel(*q))
+ initial.insert(q->getName());
}
/** Received an event for name */
@@ -279,7 +281,7 @@ class BrokerReplicator::UpdateTracker {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+ logPrefix("Backup: "), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
@@ -472,9 +474,7 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
- bool autoDel = values[AUTODEL].asBool();
- bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
@@ -488,7 +488,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
<< name);
deleteQueue(name);
}
- replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+ replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
values[ALTEX].asString());
}
}
@@ -506,7 +506,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = queues.find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+ if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
if (queueTracker.get()) queueTracker->event(name);
deleteQueue(name);
@@ -515,8 +515,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
@@ -542,7 +541,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
- } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+ } else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -559,11 +558,12 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
queues.find(values[QNAME].asString());
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
- // We only replicate binds for a replicated queue to replicated
- // exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // We only replicate binds for a replicated queue to replicated exchange
+ // that both exist locally. Respect the replication level set in the
+ // bind arguments, but replicate by default.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
@@ -581,8 +581,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
@@ -630,12 +630,7 @@ Variant getHaUuid(const Variant::Map& map) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
- return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
@@ -664,7 +659,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.replicateLevel(argsMap)) return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
if (!exchangeTracker.get())
throw Exception(QPID_MSG("Unexpected exchange response: " << values));
@@ -718,10 +713,11 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
- // Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
- replicationTest.replicateLevel(args))
+ // Automatically replicate binding if queue and exchange exist and are replicated.
+ // Respect replicate setting in binding args but default to replicated.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
@@ -741,8 +737,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = replicationTest.replicateLevel(
- values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
@@ -759,7 +754,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+ if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!exchanges.registerExchange(qr))
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 2274f49294..f64c54abe8 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -142,8 +142,8 @@ class BrokerReplicator : public broker::Exchange,
void setMembership(const types::Variant::List&); // Set membership from list.
std::string logPrefix;
- std::string userId, remoteHost;
ReplicationTest replicationTest;
+ std::string userId, remoteHost;
HaBroker& haBroker;
broker::Broker& broker;
broker::ExchangeRegistry& exchanges;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index c4cb640f97..661c518fc7 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -61,7 +61,6 @@ using boost::dynamic_pointer_cast;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
settings(s),
- replicationTest(s.replicateDefault.get()),
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 7ba023129c..6b15c88e0a 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -25,7 +25,6 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
-#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -85,7 +84,6 @@ class HaBroker : public management::Manageable
void shutdown(const std::string& message);
BrokerStatus getStatus() const;
- ReplicationTest getReplicationTest() const { return replicationTest; }
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
@@ -108,7 +106,6 @@ class HaBroker : public management::Manageable
mutable sys::Mutex lock;
Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- ReplicationTest replicationTest;
// Independently thread-safe member variables
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 67108fa5f9..93dbbbea85 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -83,7 +83,8 @@ Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
- logPrefix("Primary: "), active(false)
+ logPrefix("Primary: "), active(false),
+ replicationTest(hb.getSettings().replicateDefault.get())
{
hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
@@ -97,8 +98,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -196,19 +196,25 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
- if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) {
+ // Set replication argument.
+ ReplicateLevel level = replicationTest.useLevel(*q);
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
+ q->addArgument(QPID_REPLICATE, printable(level).str());
+ if (level) {
// Give each queue a unique id to avoid confusion of same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
- }
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
- i->second->queueCreate(q);
- checkReady(i, l);
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
}
}
// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
@@ -217,16 +223,21 @@ void Primary::queueDestroy(const QueuePtr& q) {
// NOTE: Called with exchange registry lock held.
void Primary::exchangeCreate(const ExchangePtr& ex) {
- if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) {
+ ReplicateLevel level = replicationTest.useLevel(*ex);
+ QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+ << " replication: " << printable(level));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
+ if (level) {
// Give each exchange a unique id to avoid confusion of same-named exchanges.
- FieldTable args = ex->getArgs();
args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
- ex->setArgs(args);
}
+ ex->setArgs(args);
}
// NOTE: Called with exchange registry lock held.
-void Primary::exchangeDestroy(const ExchangePtr&) {
+void Primary::exchangeDestroy(const ExchangePtr& ex) {
+ QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
// Do nothing
}
@@ -237,8 +248,7 @@ void Primary::opened(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(info, logPrefix << "New backup connected: " << info);
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 3097695817..ff85837882 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "BrokerInfo.h"
+#include "ReplicationTest.h"
#include "Role.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
@@ -104,6 +105,8 @@ class Primary : public Role
Membership& membership;
std::string logPrefix;
bool active;
+ ReplicationTest replicationTest;
+
/**
* Set of expected backups that must be ready before we declare ourselves
* active. These are backups that were known and ready before the primary
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 394ba3041b..149ee3f85e 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -34,15 +34,16 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+ const BrokerInfo& info, broker::Connection* c
) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
+ brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
{}
void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
{
- QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : ""));
queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
+ QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
+ << (createGuards ? " and guards" : ""));
}
RemoteBackup::~RemoteBackup() { cancel(); }
@@ -64,7 +65,7 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
- if (replicationTest.isReplicated(ALL, *q)) {
+ if (replicationTest.getLevel(*q) == ALL) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
catchupQueues.insert(q);
@@ -105,7 +106,7 @@ void RemoteBackup::ready(const QueuePtr& q) {
// Called via ConfigurationObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q))
+ if (replicationTest.getLevel(*q) == ALL)
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index a65d916432..769c50457e 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -55,7 +55,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
+ RemoteBackup(const BrokerInfo&, broker::Connection*);
~RemoteBackup();
/** Set all queues in the registry as catch-up queues.
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
index 1dd32262a0..18e0681f0f 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ReplicationTest.h"
+#include "qpid/log/Statement.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
@@ -28,53 +29,49 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
+ return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
- return replicateLevel(i->second.asString());
+ return getLevel(i->second.asString());
else
return replicateDefault;
}
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
-}
-
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ const Variant::Map& qmap(q.getSettings().original);
+ Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
+ if (i != qmap.end())
+ return getLevel(i->second.asString());
+ else
+ return getLevel(q.getSettings().storeSettings);
}
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
- return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ return getLevel(ex.getArgs());
}
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
{
- return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
+ bool ignore = q.isAutoDelete() && q.hasExclusiveOwner() &&
+ !q.getSettings().autoDeleteDelay;
+ return ignore ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
}
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex)
-{
- return replicateLevel(ex.getArgs()) >= level;
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ return ReplicationTest::getLevel(ex);
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index ab6b1a6bcc..7d44d82a21 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -48,22 +48,24 @@ class ReplicationTest
ReplicationTest(ReplicateLevel replicateDefault_) :
replicateDefault(replicateDefault_) {}
- // Return the simple replication level, accounting for defaults.
- ReplicateLevel replicateLevel(const std::string& str);
- ReplicateLevel replicateLevel(const framing::FieldTable& f);
- ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ // Get the replication level set on an object, or default if not set.
+ ReplicateLevel getLevel(const std::string& str);
+ ReplicateLevel getLevel(const framing::FieldTable& f);
+ ReplicateLevel getLevel(const types::Variant::Map& m);
+ ReplicateLevel getLevel(const broker::Queue&);
+ ReplicateLevel getLevel(const broker::Exchange&);
+
+ // Calculate level for objects that may not have replication set,
+ // including auto-delete/exclusive settings.
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const broker::Queue&);
+ ReplicateLevel useLevel(const broker::Exchange&);
- // Return true if replication for a queue is enabled at level or higher,
- // taking account of default level and queue settings.
- bool isReplicated(ReplicateLevel level,
- const types::Variant::Map& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level,
- const framing::FieldTable& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level, const broker::Queue&);
- bool isReplicated(ReplicateLevel level, const broker::Exchange&);
private:
ReplicateLevel replicateDefault;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_REPLICATIONTEST_H*/
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 3261e34085..587a150dda 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -702,15 +702,17 @@ acl deny all all
s.sender("e1;{create:always, node:{type:topic}}")
# cluster[1] will be the backup, has extra queues/exchanges
+ xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+ node = "node:{%s}"%(xdecl)
s = cluster[1].connect_admin().session()
- s.sender("q1;{create:always}")
- s.sender("q2;{create:always}")
- s.sender("e1;{create:always, node:{type:topic}}")
- s.sender("e2;{create:always, node:{type:topic}}")
+ s.sender("q1;{create:always, %s}"%(node))
+ s.sender("q2;{create:always, %s}"%(node))
+ s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl))
+ s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl))
for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
cluster[0].promote()
- # Verify the backup deletes the surpluis queue and exchange
+ # Verify the backup deletes the surplus queue and exchange
cluster[1].wait_status("ready")
s = cluster[1].connect_admin().session()
self.assertRaises(NotFound, s.receiver, ("q2"));
@@ -868,12 +870,14 @@ acl deny all all
# Simulate the race by re-creating the objects before promoting the new primary
cluster.kill(0, False)
+ xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+ node = "node:{%s}"%(xdecl)
sn = cluster[1].connect_admin().session()
sn.sender("qq;{delete:always}").close()
- s = sn.sender("qq;{create:always}")
+ s = sn.sender("qq;{create:always, %s}"%(node))
s.send("foo")
sn.sender("xx;{delete:always}").close()
- sn.sender("xx;{create:always,node:{type:topic}}")
+ sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
cluster[1].promote()
cluster[1].wait_status("active")
# Verify we are not still using the old objects on cluster[2]