diff options
| author | Alan Conway <aconway@apache.org> | 2013-09-03 16:07:20 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-09-03 16:07:20 +0000 |
| commit | 7ee553fc7fd90d55b672507fdfccadbb6d2a76c2 (patch) | |
| tree | 592bcba30ad0299505cadc60741860c90472aa2a /qpid/cpp/src | |
| parent | 352ea7a718a4ab1707a2448e33512b7a00ec905b (diff) | |
| download | qpid-python-7ee553fc7fd90d55b672507fdfccadbb6d2a76c2.tar.gz | |
NO-JIRA: HA minor cleanup of disconnect logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 5 |
3 files changed, 21 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 36bf89fb81..e7a0218dd8 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -878,54 +878,41 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { +void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) { boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); - if (!qr) return; - assert(qr); - if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { - if (qr->getQueue()->getSettings().autoDeleteDelay) { - // Start the auto-delete timer - qr->getQueue()->releaseFromUse(); - qr->getQueue()->scheduleAutoDelete(); + if (qr) { + qr->disconnect(); + if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { + // Transactions are aborted on failover so clean up tx-queues + deleteQueue(qr->getQueue()->getName()); } - else { - // Delete immediately. Don't purge, the primary is gone so we need - // to reroute the deleted messages. - deleteQueue(qr->getQueue()->getName(), false); + else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + qr->getQueue()->releaseFromUse(); + qr->getQueue()->scheduleAutoDelete(); + } + else { + // Delete immediately. Don't purge, the primary is gone so we need + // to reroute the deleted messages. + deleteQueue(qr->getQueue()->getName(), false); + } } } } typedef vector<boost::shared_ptr<Exchange> > ExchangeVector; -typedef vector<boost::shared_ptr<Queue> > QueueVector; // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connection = 0; - // Make copys of queues & exchanges so we can work outside the registry lock. - + // Make copy of exchanges so we can work outside the registry lock. ExchangeVector exs; exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1)); for_each(exs.begin(), exs.end(), - boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); - - QueueVector qs; - queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1)); - for_each(qs.begin(), qs.end(), - boost::bind(&BrokerReplicator::disconnectedQueue, this, _1)); -} - -// Called for queues existing when the backup is disconnected. -void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) { - QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName()); - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName()); - if (qr) { - qr->disconnect(); - if (TxReplicator::isTxQueue(q->getName())) - deleteQueue(q->getName()); - } + boost::bind(&BrokerReplicator::disconnectedExchange, this, _1)); } void BrokerReplicator::setMembership(const Variant::List& brokers) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 8045c2a91f..395f0706d9 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -104,7 +104,6 @@ class BrokerReplicator : public broker::Exchange, void connected(broker::Bridge&, broker::SessionHandler&); void existingQueue(const boost::shared_ptr<broker::Queue>&); void existingExchange(const boost::shared_ptr<broker::Exchange>&); - void disconnectedQueue(const boost::shared_ptr<broker::Queue>&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); @@ -140,8 +139,7 @@ class BrokerReplicator : public broker::Exchange, void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); - void autoDeleteCheck(boost::shared_ptr<broker::Exchange>); - + void disconnectedExchange(boost::shared_ptr<broker::Exchange>); void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index f13edfb31e..832d10483e 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -83,9 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : logPrefix = "Primary transaction "+shortStr(id)+": "; // The brokers known at this point are the ones that will be included - // in the transaction. Brokers that join later are not included - // Latecomers that have replicated the transaction will be rolled back - // when the tx-queue is deleted. + // in the transaction. Brokers that join later are not included. // BrokerInfo::Set backups(haBroker.getMembership().otherBackups()); std::transform(backups.begin(), backups.end(), inserter(members, members.begin()), @@ -102,7 +100,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) : assert(result.second); txQueue = result.first; txQueue->deliver(TxMembersEvent(members).message()); - // Do this last, it will start concurrent callbacks. } PrimaryTxObserver::~PrimaryTxObserver() {} |
