summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-03 16:07:20 +0000
committerAlan Conway <aconway@apache.org>2013-09-03 16:07:20 +0000
commit7ee553fc7fd90d55b672507fdfccadbb6d2a76c2 (patch)
tree592bcba30ad0299505cadc60741860c90472aa2a /qpid/cpp/src
parent352ea7a718a4ab1707a2448e33512b7a00ec905b (diff)
downloadqpid-python-7ee553fc7fd90d55b672507fdfccadbb6d2a76c2.tar.gz
NO-JIRA: HA minor cleanup of disconnect logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519738 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp51
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp5
3 files changed, 21 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 36bf89fb81..e7a0218dd8 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -878,54 +878,41 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- if (!qr) return;
- assert(qr);
- if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
- if (qr->getQueue()->getSettings().autoDeleteDelay) {
- // Start the auto-delete timer
- qr->getQueue()->releaseFromUse();
- qr->getQueue()->scheduleAutoDelete();
+ if (qr) {
+ qr->disconnect();
+ if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
+ // Transactions are aborted on failover so clean up tx-queues
+ deleteQueue(qr->getQueue()->getName());
}
- else {
- // Delete immediately. Don't purge, the primary is gone so we need
- // to reroute the deleted messages.
- deleteQueue(qr->getQueue()->getName(), false);
+ else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ qr->getQueue()->releaseFromUse();
+ qr->getQueue()->scheduleAutoDelete();
+ }
+ else {
+ // Delete immediately. Don't purge, the primary is gone so we need
+ // to reroute the deleted messages.
+ deleteQueue(qr->getQueue()->getName(), false);
+ }
}
}
}
typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
-typedef vector<boost::shared_ptr<Queue> > QueueVector;
// Called by ConnectionObserver::disconnected, disconnected from the network side.
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from primary " << primary);
connection = 0;
- // Make copys of queues & exchanges so we can work outside the registry lock.
-
+ // Make copy of exchanges so we can work outside the registry lock.
ExchangeVector exs;
exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, &exs, _1));
for_each(exs.begin(), exs.end(),
- boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
-
- QueueVector qs;
- queues.eachQueue(boost::bind(&QueueVector::push_back, &qs, _1));
- for_each(qs.begin(), qs.end(),
- boost::bind(&BrokerReplicator::disconnectedQueue, this, _1));
-}
-
-// Called for queues existing when the backup is disconnected.
-void BrokerReplicator::disconnectedQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(critical, "BrokerReplicator::disconnectedQueue" << q->getName());
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(q->getName());
- if (qr) {
- qr->disconnect();
- if (TxReplicator::isTxQueue(q->getName()))
- deleteQueue(q->getName());
- }
+ boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 8045c2a91f..395f0706d9 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -104,7 +104,6 @@ class BrokerReplicator : public broker::Exchange,
void connected(broker::Bridge&, broker::SessionHandler&);
void existingQueue(const boost::shared_ptr<broker::Queue>&);
void existingExchange(const boost::shared_ptr<broker::Exchange>&);
- void disconnectedQueue(const boost::shared_ptr<broker::Queue>&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -140,8 +139,7 @@ class BrokerReplicator : public broker::Exchange,
void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
- void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
-
+ void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index f13edfb31e..832d10483e 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -83,9 +83,7 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
logPrefix = "Primary transaction "+shortStr(id)+": ";
// The brokers known at this point are the ones that will be included
- // in the transaction. Brokers that join later are not included
- // Latecomers that have replicated the transaction will be rolled back
- // when the tx-queue is deleted.
+ // in the transaction. Brokers that join later are not included.
//
BrokerInfo::Set backups(haBroker.getMembership().otherBackups());
std::transform(backups.begin(), backups.end(), inserter(members, members.begin()),
@@ -102,7 +100,6 @@ PrimaryTxObserver::PrimaryTxObserver(HaBroker& hb) :
assert(result.second);
txQueue = result.first;
txQueue->deliver(TxMembersEvent(members).message());
- // Do this last, it will start concurrent callbacks.
}
PrimaryTxObserver::~PrimaryTxObserver() {}