diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 22:19:52 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 22:19:52 +0000 |
| commit | dad6d3eaac2ddb87ae0071b868e59112b2d5325b (patch) | |
| tree | 13536510244ffc9914b2ff7618cf2a346580a051 /qpid/cpp/src | |
| parent | f58ea784ad1930c6b8bbf1b9a9e32bf3575c4bed (diff) | |
| download | qpid-python-dad6d3eaac2ddb87ae0071b868e59112b2d5325b.tar.gz | |
QPID-4516: Sporadic failure in ha_tests test_failover_send_receive
Sporadic failures in ha_tests.py test_failover_send_receive. Two types of failure observed:
- core dumps in a debug build at a C++ assertion
- python test assertion like: AssertionError: Broker<137:cluster0-0.log qpidd-157 :35273> expected='ready', actual='catchup'
The following fixes were made to correct the problem:
- Missing break statement in switch.
- Remove unused function HaBroker::resetMembership
- Abort connection of timed-out backups so they can attempt to reconnect.
- New primary resets membership before allowing backups to connect.
- Remove incorrect demotion ready->catchup on timeout.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 9 |
8 files changed, 31 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 3024656daa..6317520a56 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -75,7 +75,7 @@ void Backup::initialize(const Url& brokers) { } Backup::~Backup() { - QPID_LOG(debug, logPrefix << "Backup shutting down."); + QPID_LOG(debug, logPrefix << "No longer a backup."); if (link) link->close(); if (replicator.get()) { broker.getExchanges().destroy(replicator->getName()); @@ -106,7 +106,9 @@ void Backup::setStatus(BrokerStatus status) { break; case CATCHUP: QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted."); + break; default: + // FIXME aconway 2012-12-07: fail assert(0); } } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 8f3eb3bf90..fc13314d8f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -848,7 +848,7 @@ namespace { } void BrokerReplicator::disconnected() { - QPID_LOG(info, logPrefix << "Disconnected"); + QPID_LOG(info, logPrefix << "Disconnected from " << primary); connection = 0; // Clean up auto-delete queues vector<boost::shared_ptr<Exchange> > collect; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 8c16a5ea38..8c9669f8f5 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -134,8 +134,12 @@ HaBroker::~HaBroker() { // Called from ManagementMethod on promote. void HaBroker::recover() { boost::shared_ptr<Backup> b; - { + BrokerInfo::Set backups; + { Mutex::ScopedLock l(lock); + // Reset membership before allowing backups to connect. + backups = membership.otherBackups(); + membership.reset(brokerInfo); // No longer replicating, close link. Note: link must be closed before we // setStatus(RECOVERING) as that will remove our broker info from the // outgoing link properties so we won't recognize self-connects. @@ -143,12 +147,9 @@ void HaBroker::recover() { backup.reset(); // Reset in lock. } b.reset(); // Call destructor outside of lock. - BrokerInfo::Set backups; - { + { Mutex::ScopedLock l(lock); setStatus(RECOVERING, l); - backups = membership.otherBackups(); - membership.reset(brokerInfo); // Drop the lock, new Primary may call back on activate. } // Outside of lock, may call back on activate() @@ -287,12 +288,12 @@ void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { QPID_LOG(info, logPrefix << "Status change: " << printable(status) << " -> " << printable(newStatus)); bool legal = checkTransition(status, newStatus); - assert(legal); if (!legal) { QPID_LOG(critical, logPrefix << "Illegal state transition: " << printable(status) << " -> " << printable(newStatus)); shutdown(); } + assert(legal); // FIXME aconway 2012-12-07: fail status = newStatus; statusChanged(l); } @@ -328,13 +329,6 @@ void HaBroker::setMembership(const Variant::List& brokers) { if (b) b->setStatus(status); // Oustside lock, avoid deadlock } -void HaBroker::resetMembership(const BrokerInfo& b) { - Mutex::ScopedLock l(lock); - membership.reset(b); - QPID_LOG(debug, logPrefix << "Membership reset to: " << membership); - membershipUpdated(l); -} - void HaBroker::addBroker(const BrokerInfo& b) { Mutex::ScopedLock l(lock); membership.add(b); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 76dbf57a0c..92eb2e078f 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -93,7 +93,6 @@ class HaBroker : public management::Manageable const BrokerInfo& getBrokerInfo() const { return brokerInfo; } void setMembership(const types::Variant::List&); // Set membership from list. - void resetMembership(const BrokerInfo& b); // Reset to contain just one member. void addBroker(const BrokerInfo& b); // Add a broker to the membership. void removeBroker(const types::Uuid& id); // Remove a broker from membership. diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 6d5d68191b..40ae9ff07b 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -96,7 +96,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getReplicationTest(), false)); + new RemoteBackup(*i, haBroker.getReplicationTest(), 0)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards @@ -161,9 +161,6 @@ void Primary::timeoutExpectedBackups() { expectedBackups.erase(i++); backups.erase(info.getSystemId()); rb->cancel(); - // Downgrade the broker to CATCHUP - info.setStatus(CATCHUP); - haBroker.addBroker(info); } else ++i; } @@ -228,7 +225,7 @@ void Primary::opened(broker::Connection& connection) { if (i == backups.end()) { QPID_LOG(info, logPrefix << "New backup connected: " << info); boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(info, haBroker.getReplicationTest(), true)); + new RemoteBackup(info, haBroker.getReplicationTest(), &connection)); { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); @@ -238,7 +235,7 @@ void Primary::opened(broker::Connection& connection) { } else { QPID_LOG(info, logPrefix << "Known backup connected: " << info); - i->second->setConnected(true); + i->second->setConnection(&connection); checkReady(i, l); } if (info.getStatus() == JOINING) info.setStatus(CATCHUP); diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index c713115176..08e92d5673 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -98,8 +98,8 @@ class Primary bool active; /** * Set of expected backups that must be ready before we declare ourselves - * active. These are backups that were known before the primary crashed. As - * new primary we expect them to re-connect. + * active. These are backups that were known and ready before the primary + * crashed. As new primary we expect them to re-connect. */ BackupSet expectedBackups; /** diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index b933c71bbb..6829737f29 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -21,6 +21,7 @@ #include "RemoteBackup.h" #include "QueueGuard.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/log/Statement.h" @@ -32,9 +33,10 @@ namespace ha { using sys::Mutex; using boost::bind; -RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary: Remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) +RemoteBackup::RemoteBackup( + const BrokerInfo& info, ReplicationTest rt, broker::Connection* c +) : logPrefix("Primary: Remote backup "+info.getLogId()+": "), + brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false) {} void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards) @@ -49,10 +51,14 @@ void RemoteBackup::cancel() { for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i) i->second->cancel(); guards.clear(); + if (connection) { + connection->abort(); + connection = 0; + } } bool RemoteBackup::isReady() { - return connected && catchupQueues.empty(); + return connection && catchupQueues.empty(); } void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index e48ceff3ae..a65d916432 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -33,6 +33,7 @@ namespace qpid { namespace broker { class Queue; class QueueRegistry; +class Connection; } namespace ha { @@ -54,7 +55,7 @@ class RemoteBackup /** Note: isReady() can be true after construction *@param connected true if the backup is already connected. */ - RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected); + RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*); ~RemoteBackup(); /** Set all queues in the registry as catch-up queues. @@ -66,8 +67,8 @@ class RemoteBackup GuardPtr guard(const QueuePtr&); /** Is the remote backup connected? */ - void setConnected(bool b) { connected=b; } - bool isConnected() const { return connected; } + void setConnection(broker::Connection* c) { connection = c; } + bool isConnected() const { return connection; } /** ReplicatingSubscription associated with queue is ready. * Note: may set isReady() @@ -101,7 +102,7 @@ class RemoteBackup ReplicationTest replicationTest; GuardMap guards; QueueSet catchupQueues; - bool connected; + broker::Connection* connection; bool reportedReady; }; |
