summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-19 22:19:52 +0000
committerAlan Conway <aconway@apache.org>2012-12-19 22:19:52 +0000
commitdad6d3eaac2ddb87ae0071b868e59112b2d5325b (patch)
tree13536510244ffc9914b2ff7618cf2a346580a051 /qpid/cpp/src
parentf58ea784ad1930c6b8bbf1b9a9e32bf3575c4bed (diff)
downloadqpid-python-dad6d3eaac2ddb87ae0071b868e59112b2d5325b.tar.gz
QPID-4516: Sporadic failure in ha_tests test_failover_send_receive
Sporadic failures in ha_tests.py test_failover_send_receive. Two types of failure observed: - core dumps in a debug build at a C++ assertion - python test assertion like: AssertionError: Broker<137:cluster0-0.log qpidd-157 :35273> expected='ready', actual='catchup' The following fixes were made to correct the problem: - Missing break statement in switch. - Remove unused function HaBroker::resetMembership - Abort connection of timed-out backups so they can attempt to reconnect. - New primary resets membership before allowing backups to connect. - Remove incorrect demotion ready->catchup on timeout. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424169 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h1
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp9
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h4
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h9
8 files changed, 31 insertions, 32 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 3024656daa..6317520a56 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -75,7 +75,7 @@ void Backup::initialize(const Url& brokers) {
}
Backup::~Backup() {
- QPID_LOG(debug, logPrefix << "Backup shutting down.");
+ QPID_LOG(debug, logPrefix << "No longer a backup.");
if (link) link->close();
if (replicator.get()) {
broker.getExchanges().destroy(replicator->getName());
@@ -106,7 +106,9 @@ void Backup::setStatus(BrokerStatus status) {
break;
case CATCHUP:
QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+ break;
default:
+ // FIXME aconway 2012-12-07: fail
assert(0);
}
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 8f3eb3bf90..fc13314d8f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -848,7 +848,7 @@ namespace {
}
void BrokerReplicator::disconnected() {
- QPID_LOG(info, logPrefix << "Disconnected");
+ QPID_LOG(info, logPrefix << "Disconnected from " << primary);
connection = 0;
// Clean up auto-delete queues
vector<boost::shared_ptr<Exchange> > collect;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 8c16a5ea38..8c9669f8f5 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -134,8 +134,12 @@ HaBroker::~HaBroker() {
// Called from ManagementMethod on promote.
void HaBroker::recover() {
boost::shared_ptr<Backup> b;
- {
+ BrokerInfo::Set backups;
+ {
Mutex::ScopedLock l(lock);
+ // Reset membership before allowing backups to connect.
+ backups = membership.otherBackups();
+ membership.reset(brokerInfo);
// No longer replicating, close link. Note: link must be closed before we
// setStatus(RECOVERING) as that will remove our broker info from the
// outgoing link properties so we won't recognize self-connects.
@@ -143,12 +147,9 @@ void HaBroker::recover() {
backup.reset(); // Reset in lock.
}
b.reset(); // Call destructor outside of lock.
- BrokerInfo::Set backups;
- {
+ {
Mutex::ScopedLock l(lock);
setStatus(RECOVERING, l);
- backups = membership.otherBackups();
- membership.reset(brokerInfo);
// Drop the lock, new Primary may call back on activate.
}
// Outside of lock, may call back on activate()
@@ -287,12 +288,12 @@ void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
QPID_LOG(info, logPrefix << "Status change: "
<< printable(status) << " -> " << printable(newStatus));
bool legal = checkTransition(status, newStatus);
- assert(legal);
if (!legal) {
QPID_LOG(critical, logPrefix << "Illegal state transition: "
<< printable(status) << " -> " << printable(newStatus));
shutdown();
}
+ assert(legal); // FIXME aconway 2012-12-07: fail
status = newStatus;
statusChanged(l);
}
@@ -328,13 +329,6 @@ void HaBroker::setMembership(const Variant::List& brokers) {
if (b) b->setStatus(status); // Oustside lock, avoid deadlock
}
-void HaBroker::resetMembership(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
- membershipUpdated(l);
-}
-
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 76dbf57a0c..92eb2e078f 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -93,7 +93,6 @@ class HaBroker : public management::Manageable
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
void setMembership(const types::Variant::List&); // Set membership from list.
- void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
void addBroker(const BrokerInfo& b); // Add a broker to the membership.
void removeBroker(const types::Uuid& id); // Remove a broker from membership.
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 6d5d68191b..40ae9ff07b 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -96,7 +96,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), false));
+ new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -161,9 +161,6 @@ void Primary::timeoutExpectedBackups() {
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
- // Downgrade the broker to CATCHUP
- info.setStatus(CATCHUP);
- haBroker.addBroker(info);
}
else ++i;
}
@@ -228,7 +225,7 @@ void Primary::opened(broker::Connection& connection) {
if (i == backups.end()) {
QPID_LOG(info, logPrefix << "New backup connected: " << info);
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
@@ -238,7 +235,7 @@ void Primary::opened(broker::Connection& connection) {
}
else {
QPID_LOG(info, logPrefix << "Known backup connected: " << info);
- i->second->setConnected(true);
+ i->second->setConnection(&connection);
checkReady(i, l);
}
if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index c713115176..08e92d5673 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -98,8 +98,8 @@ class Primary
bool active;
/**
* Set of expected backups that must be ready before we declare ourselves
- * active. These are backups that were known before the primary crashed. As
- * new primary we expect them to re-connect.
+ * active. These are backups that were known and ready before the primary
+ * crashed. As new primary we expect them to re-connect.
*/
BackupSet expectedBackups;
/**
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index b933c71bbb..6829737f29 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -21,6 +21,7 @@
#include "RemoteBackup.h"
#include "QueueGuard.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/log/Statement.h"
@@ -32,9 +33,10 @@ namespace ha {
using sys::Mutex;
using boost::bind;
-RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary: Remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
+RemoteBackup::RemoteBackup(
+ const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
+ brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
{}
void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -49,10 +51,14 @@ void RemoteBackup::cancel() {
for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
i->second->cancel();
guards.clear();
+ if (connection) {
+ connection->abort();
+ connection = 0;
+ }
}
bool RemoteBackup::isReady() {
- return connected && catchupQueues.empty();
+ return connection && catchupQueues.empty();
}
void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index e48ceff3ae..a65d916432 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace broker {
class Queue;
class QueueRegistry;
+class Connection;
}
namespace ha {
@@ -54,7 +55,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
+ RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
~RemoteBackup();
/** Set all queues in the registry as catch-up queues.
@@ -66,8 +67,8 @@ class RemoteBackup
GuardPtr guard(const QueuePtr&);
/** Is the remote backup connected? */
- void setConnected(bool b) { connected=b; }
- bool isConnected() const { return connected; }
+ void setConnection(broker::Connection* c) { connection = c; }
+ bool isConnected() const { return connection; }
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
@@ -101,7 +102,7 @@ class RemoteBackup
ReplicationTest replicationTest;
GuardMap guards;
QueueSet catchupQueues;
- bool connected;
+ broker::Connection* connection;
bool reportedReady;
};