diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 59 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 40 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.cpp | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicationTest.h | 26 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 18 |
11 files changed, 109 insertions, 101 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 0f4c5b2be8..410ebc3114 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -242,12 +242,14 @@ class BrokerReplicator::UpdateTracker { /** Add an exchange name */ void addExchange(Exchange::shared_ptr ex) { - if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName()); + if (repTest.getLevel(*ex)) + initial.insert(ex->getName()); } /** Add a queue name. */ void addQueue(Queue::shared_ptr q) { - if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName()); + if (repTest.getLevel(*q)) + initial.insert(q->getName()); } /** Received an event for name */ @@ -279,7 +281,7 @@ class BrokerReplicator::UpdateTracker { BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), + logPrefix("Backup: "), replicationTest(NONE), haBroker(hb), broker(hb.getBroker()), exchanges(broker.getExchanges()), queues(broker.getQueues()), link(l), @@ -472,9 +474,7 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { Variant::Map argsMap = asMapVoid(values[ARGS]); - bool autoDel = values[AUTODEL].asBool(); - bool excl = values[EXCL].asBool(); - if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); QPID_LOG(debug, logPrefix << "Queue declare event: " << name); @@ -488,7 +488,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { << name); deleteQueue(name); } - replicateQueue(name, values[DURABLE].asBool(), autoDel, args, + replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args, values[ALTEX].asString()); } } @@ -506,7 +506,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = queues.find(name); - if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { + if (queue && replicationTest.getLevel(*queue)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); if (queueTracker.get()) queueTracker->event(name); deleteQueue(name); @@ -515,8 +515,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. - if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); if (exchangeTracker.get()) exchangeTracker->event(name); @@ -542,7 +541,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = exchanges.find(name); if (!exchange) { QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); - } else if (!replicationTest.replicateLevel(exchange->getArgs())) { + } else if (!replicationTest.getLevel(*exchange)) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); @@ -559,11 +558,12 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { queues.find(values[QNAME].asString()); framing::FieldTable args; qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); - // We only replicate binds for a replicated queue to replicated - // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && - replicationTest.replicateLevel(args)) + // We only replicate binds for a replicated queue to replicated exchange + // that both exist locally. Respect the replication level set in the + // bind arguments, but replicate by default. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() @@ -581,8 +581,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { queues.find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue)) { string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() @@ -630,12 +630,7 @@ Variant getHaUuid(const Variant::Map& map) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.isReplicated( - CONFIGURATION, - values[ARGUMENTS].asMap(), - values[AUTODELETE].asBool(), - values[EXCLUSIVE].asBool())) - return; + if (!replicationTest.getLevel(argsMap)) return; string name(values[NAME].asString()); if (!queueTracker.get()) throw Exception(QPID_MSG("Unexpected queue response: " << values)); @@ -664,7 +659,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.replicateLevel(argsMap)) return; + if (!replicationTest.getLevel(argsMap)) return; string name = values[NAME].asString(); if (!exchangeTracker.get()) throw Exception(QPID_MSG("Unexpected exchange response: " << values)); @@ -718,10 +713,11 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { framing::FieldTable args; qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); - // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && - replicationTest.replicateLevel(args)) + // Automatically replicate binding if queue and exchange exist and are replicated. + // Respect replicate setting in binding args but default to replicated. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName @@ -741,8 +737,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { QPID_LOG(trace, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); - ReplicateLevel primary = replicationTest.replicateLevel( - values[REPLICATE_DEFAULT].asString()); + ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine << ") does not match primary (" << primary << ")")); @@ -759,7 +754,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( const boost::shared_ptr<Queue>& queue) { - if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { + if (replicationTest.getLevel(*queue) == ALL) { boost::shared_ptr<QueueReplicator> qr( new QueueReplicator(haBroker, queue, link)); if (!exchanges.registerExchange(qr)) diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 2274f49294..f64c54abe8 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -142,8 +142,8 @@ class BrokerReplicator : public broker::Exchange, void setMembership(const types::Variant::List&); // Set membership from list. std::string logPrefix; - std::string userId, remoteHost; ReplicationTest replicationTest; + std::string userId, remoteHost; HaBroker& haBroker; broker::Broker& broker; broker::ExchangeRegistry& exchanges; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index c4cb640f97..661c518fc7 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -61,7 +61,6 @@ using boost::dynamic_pointer_cast; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : systemId(b.getSystem()->getSystemId().data()), settings(s), - replicationTest(s.replicateDefault.get()), broker(b), observer(new ConnectionObserver(*this, systemId)), role(new StandAlone), diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 7ba023129c..6b15c88e0a 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -25,7 +25,6 @@ #include "BrokerInfo.h" #include "Membership.h" #include "types.h" -#include "ReplicationTest.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -85,7 +84,6 @@ class HaBroker : public management::Manageable void shutdown(const std::string& message); BrokerStatus getStatus() const; - ReplicationTest getReplicationTest() const { return replicationTest; } boost::shared_ptr<ConnectionObserver> getObserver() { return observer; } BrokerInfo getBrokerInfo() const { return membership.getInfo(); } @@ -108,7 +106,6 @@ class HaBroker : public management::Manageable mutable sys::Mutex lock; Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; - ReplicationTest replicationTest; // Independently thread-safe member variables broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 67108fa5f9..93dbbbea85 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -83,7 +83,8 @@ Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), - logPrefix("Primary: "), active(false) + logPrefix("Primary: "), active(false), + replicationTest(hb.getSettings().replicateDefault.get()) { hb.getMembership().setStatus(RECOVERING); assert(instance == 0); @@ -97,8 +98,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getReplicationTest(), 0)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards @@ -196,19 +196,25 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { // NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { - if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) { + // Set replication argument. + ReplicateLevel level = replicationTest.useLevel(*q); + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); + q->addArgument(QPID_REPLICATE, printable(level).str()); + if (level) { // Give each queue a unique id to avoid confusion of same-named queues. q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); - } - Mutex::ScopedLock l(lock); - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { - i->second->queueCreate(q); - checkReady(i, l); + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } } } // NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { + QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); @@ -217,16 +223,21 @@ void Primary::queueDestroy(const QueuePtr& q) { // NOTE: Called with exchange registry lock held. void Primary::exchangeCreate(const ExchangePtr& ex) { - if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) { + ReplicateLevel level = replicationTest.useLevel(*ex); + QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() + << " replication: " << printable(level)); + FieldTable args = ex->getArgs(); + args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. + if (level) { // Give each exchange a unique id to avoid confusion of same-named exchanges. - FieldTable args = ex->getArgs(); args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0]))); - ex->setArgs(args); } + ex->setArgs(args); } // NOTE: Called with exchange registry lock held. -void Primary::exchangeDestroy(const ExchangePtr&) { +void Primary::exchangeDestroy(const ExchangePtr& ex) { + QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); // Do nothing } @@ -237,8 +248,7 @@ void Primary::opened(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { QPID_LOG(info, logPrefix << "New backup connected: " << info); - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(info, haBroker.getReplicationTest(), &connection)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 3097695817..ff85837882 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -24,6 +24,7 @@ #include "types.h" #include "BrokerInfo.h" +#include "ReplicationTest.h" #include "Role.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> @@ -104,6 +105,8 @@ class Primary : public Role Membership& membership; std::string logPrefix; bool active; + ReplicationTest replicationTest; + /** * Set of expected backups that must be ready before we declare ourselves * active. These are backups that were known and ready before the primary diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 394ba3041b..149ee3f85e 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -34,15 +34,16 @@ using sys::Mutex; using boost::bind; RemoteBackup::RemoteBackup( - const BrokerInfo& info, ReplicationTest rt, broker::Connection* c + const BrokerInfo& info, broker::Connection* c ) : logPrefix("Primary: Remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false) + brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false) {} void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards) { - QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : "")); queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards)); + QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues" + << (createGuards ? " and guards" : "")); } RemoteBackup::~RemoteBackup() { cancel(); } @@ -64,7 +65,7 @@ bool RemoteBackup::isReady() { } void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { - if (replicationTest.isReplicated(ALL, *q)) { + if (replicationTest.getLevel(*q) == ALL) { QPID_LOG(debug, logPrefix << "Catch-up queue" << (createGuard ? " and guard" : "") << ": " << q->getName()); catchupQueues.insert(q); @@ -105,7 +106,7 @@ void RemoteBackup::ready(const QueuePtr& q) { // Called via ConfigurationObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { - if (replicationTest.isReplicated(ALL, *q)) + if (replicationTest.getLevel(*q) == ALL) guards[q].reset(new QueueGuard(*q, brokerInfo)); } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index a65d916432..769c50457e 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -55,7 +55,7 @@ class RemoteBackup /** Note: isReady() can be true after construction *@param connected true if the backup is already connected. */ - RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*); + RemoteBackup(const BrokerInfo&, broker::Connection*); ~RemoteBackup(); /** Set all queues in the registry as catch-up queues. diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp index 1dd32262a0..18e0681f0f 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -19,6 +19,7 @@ * */ #include "ReplicationTest.h" +#include "qpid/log/Statement.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" @@ -28,53 +29,49 @@ namespace ha { using types::Variant; -ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) { +ReplicateLevel ReplicationTest::getLevel(const std::string& str) { Enum<ReplicateLevel> rl(replicateDefault); if (!str.empty()) rl.parse(str); return rl.get(); } -ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) { +ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) { if (f.isSet(QPID_REPLICATE)) - return replicateLevel(f.getAsString(QPID_REPLICATE)); + return getLevel(f.getAsString(QPID_REPLICATE)); else return replicateDefault; } -ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) { +ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) - return replicateLevel(i->second.asString()); + return getLevel(i->second.asString()); else return replicateDefault; } -namespace { -const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); -} - -bool ReplicationTest::isReplicated( - ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive) -{ - bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); - return !ignore && replicateLevel(args) >= level; +ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { + const Variant::Map& qmap(q.getSettings().original); + Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE); + if (i != qmap.end()) + return getLevel(i->second.asString()); + else + return getLevel(q.getSettings().storeSettings); } -bool ReplicationTest::isReplicated( - ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive) -{ - bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT); - return !ignore && replicateLevel(args) >= level; +ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) { + return getLevel(ex.getArgs()); } -bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) +ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) { - return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner()); + bool ignore = q.isAutoDelete() && q.hasExclusiveOwner() && + !q.getSettings().autoDeleteDelay; + return ignore ? ReplicationTest(NONE).getLevel(q) : getLevel(q); } -bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex) -{ - return replicateLevel(ex.getArgs()) >= level; +ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) { + return ReplicationTest::getLevel(ex); } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index ab6b1a6bcc..7d44d82a21 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -48,22 +48,24 @@ class ReplicationTest ReplicationTest(ReplicateLevel replicateDefault_) : replicateDefault(replicateDefault_) {} - // Return the simple replication level, accounting for defaults. - ReplicateLevel replicateLevel(const std::string& str); - ReplicateLevel replicateLevel(const framing::FieldTable& f); - ReplicateLevel replicateLevel(const types::Variant::Map& m); + // Get the replication level set on an object, or default if not set. + ReplicateLevel getLevel(const std::string& str); + ReplicateLevel getLevel(const framing::FieldTable& f); + ReplicateLevel getLevel(const types::Variant::Map& m); + ReplicateLevel getLevel(const broker::Queue&); + ReplicateLevel getLevel(const broker::Exchange&); + + // Calculate level for objects that may not have replication set, + // including auto-delete/exclusive settings. + ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive); + ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive); + ReplicateLevel useLevel(const broker::Queue&); + ReplicateLevel useLevel(const broker::Exchange&); - // Return true if replication for a queue is enabled at level or higher, - // taking account of default level and queue settings. - bool isReplicated(ReplicateLevel level, - const types::Variant::Map& args, bool autodelete, bool exclusive); - bool isReplicated(ReplicateLevel level, - const framing::FieldTable& args, bool autodelete, bool exclusive); - bool isReplicated(ReplicateLevel level, const broker::Queue&); - bool isReplicated(ReplicateLevel level, const broker::Exchange&); private: ReplicateLevel replicateDefault; }; + }} // namespace qpid::ha #endif /*!QPID_HA_REPLICATIONTEST_H*/ diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3261e34085..587a150dda 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -702,15 +702,17 @@ acl deny all all s.sender("e1;{create:always, node:{type:topic}}") # cluster[1] will be the backup, has extra queues/exchanges + xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}" + node = "node:{%s}"%(xdecl) s = cluster[1].connect_admin().session() - s.sender("q1;{create:always}") - s.sender("q2;{create:always}") - s.sender("e1;{create:always, node:{type:topic}}") - s.sender("e2;{create:always, node:{type:topic}}") + s.sender("q1;{create:always, %s}"%(node)) + s.sender("q2;{create:always, %s}"%(node)) + s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl)) + s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl)) for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a) cluster[0].promote() - # Verify the backup deletes the surpluis queue and exchange + # Verify the backup deletes the surplus queue and exchange cluster[1].wait_status("ready") s = cluster[1].connect_admin().session() self.assertRaises(NotFound, s.receiver, ("q2")); @@ -868,12 +870,14 @@ acl deny all all # Simulate the race by re-creating the objects before promoting the new primary cluster.kill(0, False) + xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}" + node = "node:{%s}"%(xdecl) sn = cluster[1].connect_admin().session() sn.sender("qq;{delete:always}").close() - s = sn.sender("qq;{create:always}") + s = sn.sender("qq;{create:always, %s}"%(node)) s.send("foo") sn.sender("xx;{delete:always}").close() - sn.sender("xx;{create:always,node:{type:topic}}") + sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl)) cluster[1].promote() cluster[1].wait_status("active") # Verify we are not still using the old objects on cluster[2] |
