diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 1 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 112 |
9 files changed, 89 insertions, 97 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 503de3e351..93ad5ec381 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -100,8 +100,8 @@ Role* Backup::recover(Mutex::ScopedLock&) { // Reset membership before allowing backups to connect. backups = membership.otherBackups(); membership.clear(); - return new Primary(haBroker, backups); } + return new Primary(haBroker, backups); } Role* Backup::promote() { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 5e8da17a1b..1587b5b33f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -865,27 +865,14 @@ bool BrokerReplicator::hasBindings() { return false; } string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) { +void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator if (qr) { qr->disconnect(); if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { // Transactions are aborted on failover so clean up tx-queues deleteQueue(qr->getQueue()->getName()); } - else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { - if (qr->getQueue()->getSettings().autoDeleteDelay) { - // Start the auto-delete timer - qr->getQueue()->releaseFromUse(); - qr->getQueue()->scheduleAutoDelete(); - } - else { - // Delete immediately. Don't purge, the primary is gone so we need - // to reroute the deleted messages. - deleteQueue(qr->getQueue()->getName(), false); - } - } } } @@ -893,9 +880,9 @@ typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; // Callback function for accumulating exchange candidates namespace { - void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { - ev.push_back(i); - } +void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { + ev.push_back(i); +} } // Called by ConnectionObserver::disconnected, disconnected from the network side. @@ -907,7 +894,7 @@ void BrokerReplicator::disconnected() { ExchangeVector exs; exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); for_each(exs.begin(), exs.end(), - boost::bind(&BrokerReplicator::disconnectedExchange, this, _1)); + boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } void BrokerReplicator::setMembership(const Variant::List& brokers) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index e319ab1219..b3e3fe3223 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -148,7 +148,7 @@ class BrokerReplicator : public broker::Exchange, void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); - void disconnectedExchange(boost::shared_ptr<broker::Exchange>); + void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>); void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0c1858ceb1..0c0fe983bb 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -94,7 +94,16 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()) { + // Note that at this point, we are still rejecting client connections. + // So we are safe from client interference while we set up the primary. + hb.getMembership().setStatus(RECOVERING); + + // Process all QueueReplicators, handles auto-delete queues. + QueueReplicator::Vector qrs; + QueueReplicator::copy(hb.getBroker().getExchanges(), qrs); + std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1)); + broker::QueueRegistry& queues = hb.getBroker().getQueues(); queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1)); if (expect.empty()) { diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index a32334bcf9..eeb3312aec 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -26,7 +26,6 @@ #include "QueueGuard.h" #include "RemoteBackup.h" #include "ReplicatingSubscription.h" -#include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" @@ -121,7 +120,7 @@ void PrimaryTxObserver::initialize() { throw InvalidArgumentException( QPID_MSG(logPrefix << "TX replication queue already exists.")); txQueue = result.first; - txQueue->markInUse(true); // Prevent auto-delete till we are done. + txQueue->markInUse(); // Prevent auto-delete till we are done. txQueue->deliver(TxBackupsEvent(backups).message()); } @@ -228,7 +227,8 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) { // If there are no outstanding completions, break pointer cycle here. // Otherwise break it in cancel() when the remaining completions are done. if (incomplete.empty()) txBuffer = 0; - txQueue->releaseFromUse(true); // txQueue will auto-delete + txQueue->releaseFromUse(); // txQueue will auto-delete + txQueue->scheduleAutoDelete(); txQueue.reset(); try { broker.getExchanges().destroy(getExchangeName()); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index cc6c8a3f30..50f2ececdb 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -50,6 +50,7 @@ using namespace framing::execution; using namespace std; using std::exception; using sys::Mutex; +using boost::shared_ptr; const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); @@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) { return startsWith(name, QUEUE_REPLICATOR_PREFIX); } +namespace { +void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) { + shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) v.push_back(qr); +} +} + +void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { + registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); +} + class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) @@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); + // Don't allow backup queues to auto-delete, primary decides when to delete. if (q->isAutoDelete()) q->markInUse(); dispatch[DequeueEvent::KEY] = @@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const bool QueueReplicator::hasBindings() { return false; } std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; } +void QueueReplicator::promoted() { + // Promoted to primary, deal with auto-delete now. + if (queue && queue->isAutoDelete() && subscribed) { + // Make a temporary shared_ptr to prevent premature deletion of queue. + // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue + // which could delete the queue while it's still running it's destroyed logic. + boost::shared_ptr<Queue> q(queue); + q->releaseFromUse(); + q->scheduleAutoDelete(); + } +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 6fd140fde3..8938285fe3 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -38,6 +38,7 @@ class Queue; class QueueRegistry; class SessionHandler; class Deliverable; +class ExchangeRegistry; } namespace ha { @@ -59,9 +60,12 @@ class QueueReplicator : public broker::Exchange, public: static const std::string QPID_SYNC_FREQUENCY; static const std::string REPLICATOR_PREFIX; + typedef std::vector<boost::shared_ptr<QueueReplicator> > Vector; static std::string replicatorName(const std::string& queueName); static bool isReplicatorName(const std::string&); + /*** Copy QueueReplicators from the registry */ + static void copy(broker::ExchangeRegistry&, Vector& result); QueueReplicator(HaBroker&, boost::shared_ptr<broker::Queue> q, @@ -78,7 +82,6 @@ class QueueReplicator : public broker::Exchange, // Set if the queue has ever been subscribed to, used for auto-delete cleanup. void setSubscribed() { subscribed = true; } - bool isSubscribed() { return subscribed; } boost::shared_ptr<broker::Queue> getQueue() const { return queue; } @@ -90,6 +93,8 @@ class QueueReplicator : public broker::Exchange, bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); bool hasBindings(); + void promoted(); + protected: typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn; typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index d0b93da85f..95215e1e59 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -22,7 +22,6 @@ #include "Event.h" #include "IdSetter.h" #include "QueueGuard.h" -#include "QueueReplicator.h" #include "QueueSnapshots.h" #include "ReplicatingSubscription.h" #include "TxReplicatingSubscription.h" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 138868f64e..1a5d6ddff8 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -34,6 +34,14 @@ log = getLogger(__name__) class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" +def alt_setup(session, suffix): + # Create exchange to use as alternate and a queue bound to it. + # altex exchange: acts as alternate exchange + session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) + # altq queue bound to altex, collect re-routed messages. + session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + + class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -718,19 +726,44 @@ acl deny all all except NotFound: pass assert not cluster[1].agent().getQueue("q") # Should not be in QMF - def alt_setup(self, session, suffix): - # Create exchange to use as alternate and a queue bound to it. - # altex exchange: acts as alternate exchange - session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) - # altq queue bound to altex, collect re-routed messages. - session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) + def test_auto_delete_failover(self): + """Test auto-delete queues. Verify that: + - queues auto-deleted on the primary are deleted on the backup. + - auto-delete queues with/without timeout are deleted after a failover. + - messages are correctly routed to the alternate exchange. + """ + cluster = HaCluster(self, 3) + s = cluster[0].connect().session() + def setup(q, timeout=""): + if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout + # Create alternate exchange, auto-delete queue and queue bound to alt. ex. + s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q) + qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout)) + s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q)) + qs.send(q) # Send a message to the auto-delete queue + return s + + for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args) + receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues + # Note q5 is never subscribed to, so should not be auto-deleted. + receivers[3].close() # Trigger auto-delete for q4 + cluster[0].kill(final=False) + cluster[2].promote() + cluster.restart(0) + cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout. + for i in [2,1,0]: + for q in ["q1", "q2", "q3","q4"]: + cluster[i].wait_no_queue(q,timeout=2) # auto-deleted + cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate + cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted. + cluster[i].assert_browse_backup("q5-altq", []) def test_auto_delete_close(self): """Verify auto-delete queues are deleted on backup if auto-deleted on primary""" cluster=HaCluster(self, 2) p = cluster[0].connect().session() - self.alt_setup(p, "1") + alt_setup(p, "1") r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) s = p.sender("adq1") for m in ["aa","bb","cc"]: s.send(m) @@ -742,71 +775,6 @@ acl deny all all cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) cluster[1].wait_queue("adq2") - def test_auto_delete_crash(self): - """Verify auto-delete queues are deleted on backup if the primary crashes""" - cluster=HaCluster(self, 2) - p = cluster[0].connect().session() - self.alt_setup(p,"1") - - # adq1 is subscribed so will be auto-deleted. - r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) - s = p.sender("adq1") - for m in ["aa","bb","cc"]: s.send(m) - # adq2 is subscribed after cluster[2] starts. - p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") - # adq3 is never subscribed. - p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}") - - cluster.start() - cluster[2].wait_status("ready") - - p.receiver("adq2") # Subscribed after cluster[2] joined - - for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q) - for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q) - cluster[0].kill() - - cluster[1].wait_no_queue("adq1") - cluster[1].wait_no_queue("adq2") - cluster[1].wait_queue("adq3") - - cluster[2].wait_no_queue("adq1") - cluster[2].wait_no_queue("adq2") - cluster[2].wait_queue("adq3") - - cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) - cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"]) - - def test_auto_delete_timeout(self): - cluster = HaCluster(self, 2) - # Test timeout - r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") - # Test special case of timeout = 0 - r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}") - cluster[1].wait_queue("q0") - cluster[1].wait_queue("q1") - cluster[0].kill() - cluster[1].wait_queue("q1") # Not timed out yet - cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout - cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout - - def test_alt_exchange_dup(self): - """QPID-4349: if a queue has an alterante exchange and is deleted the - messages appear twice on the alternate, they are rerouted once by the - primary and again by the backup.""" - cluster = HaCluster(self,2) - - # Set up q with alternate exchange altex bound to altq. - s = cluster[0].connect().session() - s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") - s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") - snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}") - messages = [ str(n) for n in xrange(10) ] - for m in messages: snd.send(m) - cluster[1].assert_browse_backup("q", messages) - s.sender("q;{delete:always}").close() - cluster[1].assert_browse_backup("altq", messages) - def test_expired(self): """Regression test for QPID-4379: HA does not properly handle expired messages""" # Race between messages expiring and HA replicating consumer. |
