summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp100
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h20
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp43
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp227
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h49
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp110
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h45
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp26
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h10
-rw-r--r--qpid/cpp/src/qpid/ha/Role.h55
-rw-r--r--qpid/cpp/src/qpid/ha/StandAlone.h45
14 files changed, 442 insertions, 298 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 6317520a56..2aabf6342b 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -20,9 +20,12 @@
*/
#include "Backup.h"
#include "BrokerReplicator.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "Primary.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StatusCheck.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -44,28 +47,38 @@ using namespace framing;
using namespace broker;
using types::Variant;
using std::string;
+using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+ haBroker(hb), broker(hb.getBroker()), settings(s),
+ statusCheck(
+ new StatusCheck(
+ logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
{
- // Empty brokerUrl means delay initialization until seBrokertUrl() is called.
- if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+ // Set link properties to tag outgoing links.
+ framing::FieldTable linkProperties = broker.getLinkClientProperties();
+ linkProperties.setTable(
+ ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
+ broker.setLinkClientProperties(linkProperties);
}
-void Backup::initialize(const Url& brokers) {
- if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
- types::Uuid uuid(true);
- // Declare the link
- std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
- broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- brokers[0].host, brokers[0].port, protocol,
- false, // durable
- settings.mechanism, settings.username, settings.password,
- false); // no amq.failover - don't want to use client URL.
- {
- sys::Mutex::ScopedLock l(lock);
+void Backup::setBrokerUrl(const Url& brokers) {
+ if (brokers.empty()) return;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return;
+ if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
+ if (!link) { // Not yet initialized
+ QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
+ types::Uuid uuid(true);
+ std::pair<Link::shared_ptr, bool> result;
+ result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+ brokers[0].host, brokers[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password,
+ false); // no amq.failover - don't want to use client URL.
link = result.first;
replicator.reset(new BrokerReplicator(haBroker, link));
replicator->initialize();
@@ -74,8 +87,9 @@ void Backup::initialize(const Url& brokers) {
link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
-Backup::~Backup() {
- QPID_LOG(debug, logPrefix << "No longer a backup.");
+void Backup::stop(Mutex::ScopedLock&) {
+ if (stopped) return;
+ QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
if (replicator.get()) {
broker.getExchanges().destroy(replicator->getName());
@@ -84,33 +98,45 @@ Backup::~Backup() {
}
}
-// Called via management.
-void Backup::setBrokerUrl(const Url& url) {
- // Ignore empty URLs seen during start-up for some tests.
- if (url.empty()) return;
- bool linkSet = false;
+Role* Backup::recover(Mutex::ScopedLock&) {
+ BrokerInfo::Set backups;
{
- sys::Mutex::ScopedLock l(lock);
- linkSet = link;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ stop(l); // Stop backup activity before starting primary.
+ QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo());
+ // Reset membership before allowing backups to connect.
+ backups = membership.otherBackups();
+ membership.clear();
+ return new Primary(haBroker, backups);
}
- if (linkSet)
- link->setUrl(url); // Outside lock, once set link doesn't change
- else
- initialize(url); // Deferred initialization
}
-void Backup::setStatus(BrokerStatus status) {
- switch (status) {
- case READY:
- QPID_LOG(notice, logPrefix << "Ready to become primary.");
+Role* Backup::promote() {
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ switch (haBroker.getStatus()) {
+ case JOINING:
+ if (statusCheck->canPromote()) return recover(l);
+ else {
+ QPID_LOG(error,
+ logPrefix << "Joining active cluster, cannot be promoted.");
+ throw Exception("Joining active cluster, cannot be promoted.");
+ }
break;
case CATCHUP:
- QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ throw Exception("Still catching up, cannot be promoted.");
break;
+ case READY: return recover(l); break;
default:
- // FIXME aconway 2012-12-07: fail
- assert(0);
+ assert(0); // Not a valid state for the Backup role..
}
}
+Backup::~Backup() {
+ Mutex::ScopedLock l(lock);
+ stop(l);
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 4f2d5babde..4943ca5e2e 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,6 +22,7 @@
*
*/
+#include "Role.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -38,30 +39,41 @@ namespace ha {
class Settings;
class BrokerReplicator;
class HaBroker;
+class StatusCheck;
+class Membership;
/**
- * State associated with a backup broker. Manages connections to primary.
+ * Backup role: Manages connections to primary, replicates management events and queue contents.
*
* THREAD SAFE
*/
-class Backup
+class Backup : public Role
{
public:
Backup(HaBroker&, const Settings&);
~Backup();
+
+ std::string getLogPrefix() const { return logPrefix; }
+
void setBrokerUrl(const Url&);
- void setStatus(BrokerStatus);
+
+ Role* promote();
private:
- void initialize(const Url&);
+ void stop(sys::Mutex::ScopedLock&);
+ Role* recover(sys::Mutex::ScopedLock&);
+
std::string logPrefix;
+ Membership& membership;
sys::Mutex lock;
+ bool stopped;
HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<BrokerReplicator> replicator;
+ std::auto_ptr<StatusCheck> statusCheck;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 5a8dfa512a..a0c4af88ca 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -45,8 +45,9 @@ using framing::FieldTable;
BrokerInfo::BrokerInfo() : port(0), status(JOINING) {}
-BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
- hostName(host), port(port_), systemId(id), status(JOINING)
+BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s,
+ const std::string& host, uint16_t port_) :
+ hostName(host), port(port_), systemId(id), status(s)
{
updateLogId();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index 5131e23be2..6142e03f98 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -44,7 +44,8 @@ class BrokerInfo
typedef std::map<types::Uuid, BrokerInfo> Map;
BrokerInfo();
- BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
+ BrokerInfo(const types::Uuid& id, BrokerStatus,
+ const std::string& host=std::string(), uint16_t port=0);
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
BrokerInfo(const types::Variant::Map& m) { assign(m); }
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index f51a69e461..37c2a2d6b4 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -227,7 +227,9 @@ class BrokerReplicator::UpdateTracker {
typedef std::set<std::string> Names;
typedef boost::function<void (const std::string&)> CleanFn;
- UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {}
+ UpdateTracker(const std::string& type_, // "queue" or "exchange"
+ CleanFn f, const ReplicationTest& rt)
+ : type(type_), cleanFn(f), repTest(rt) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -264,6 +266,12 @@ class BrokerReplicator::UpdateTracker {
}
private:
+ void clean(const std::string& name) {
+ QPID_LOG(info, "Backup updated, deleting " << type << " " << name);
+ cleanFn(name);
+ }
+
+ std::string type;
Names initial, events;
CleanFn cleanFn;
ReplicationTest repTest;
@@ -353,13 +361,15 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
initialized = true;
exchangeTracker.reset(
- new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ new UpdateTracker("exchange",
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1),
replicationTest));
exchanges.eachExchange(
boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
queueTracker.reset(
- new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ new UpdateTracker("queue",
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
replicationTest));
queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
@@ -394,7 +404,7 @@ void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
if (haBroker.getStatus() == JOINING) {
- haBroker.setStatus(CATCHUP);
+ haBroker.getMembership().setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
Variant::List list;
@@ -572,7 +582,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
- haBroker.setMembership(members);
+ setMembership(members);
}
void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
@@ -724,7 +734,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
- haBroker.setMembership(values[MEMBERS].asList());
+ setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
haBroker.shutdown(
QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
@@ -861,4 +871,25 @@ void BrokerReplicator::disconnected() {
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
}
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+ Membership& membership(haBroker.getMembership());
+ membership.assign(brokers);
+ // Check if the primary has signalled a change in my status:
+ // from CATCHUP to READY when we are caught up.
+ // from READY TO CATCHUP if we are timed out during fail-over.
+ BrokerInfo info;
+ if (membership.get(membership.getSelf(), info)) {
+ BrokerStatus oldStatus = haBroker.getStatus();
+ BrokerStatus newStatus = info.getStatus();
+ if (oldStatus == CATCHUP && newStatus == READY) {
+ QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+ haBroker.getMembership().setStatus(READY);
+ }
+ else if (oldStatus == READY && newStatus == CATCHUP) {
+ QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+ haBroker.getMembership().setStatus(CATCHUP);
+ }
+ }
+}
+
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 9134163575..9161227c0f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -136,6 +136,8 @@ class BrokerReplicator : public broker::Exchange,
void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
void disconnected();
+ void setMembership(const types::Variant::List&); // Set membership from list.
+
std::string logPrefix;
std::string userId, remoteHost;
ReplicationTest replicationTest;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 0fd8c9e518..c4cb640f97 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -26,7 +26,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
-#include "StatusCheck.h"
+#include "StandAlone.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -42,7 +42,6 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -56,24 +55,23 @@ using types::Variant;
using types::Uuid;
using sys::Mutex;
using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : broker(b),
- systemId(broker.getSystem()->getSystemId().data()),
+ : systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ replicationTest(s.replicateDefault.get()),
+ broker(b),
observer(new ConnectionObserver(*this, systemId)),
- status(STANDALONE),
- logPrefix("Broker: "),
- membership(systemId),
- replicationTest(s.replicateDefault.get())
+ role(new StandAlone),
+ membership(BrokerInfo(systemId, STANDALONE), *this)
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- status = JOINING;
- QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+ QPID_LOG(debug, role->getLogPrefix() << "Rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
@@ -87,13 +85,16 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
-
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT),
- systemId);
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+ membership.add(
+ BrokerInfo(
+ membership.getSelf(),
+ settings.cluster ? JOINING : membership.getStatus(),
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT)
+ )
+ );
+ QPID_LOG(notice, role->getLogPrefix() << "Initializing: " << membership.getInfo());
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -104,90 +105,34 @@ void HaBroker::initialize() {
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
+ membership.setMgmtObject(mgmtObject);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
+ shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
- status = JOINING;
- backup.reset(new Backup(*this, settings));
+ assert(membership.getStatus() == JOINING);
+ role.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
- statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
-
-
- // NOTE: lock is not needed in a constructor, but create one
- // to pass to functions that have a ScopedLock parameter.
- Mutex::ScopedLock l(lock);
- statusChanged(l);
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down");
+ QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
-// Called from ManagementMethod on promote.
-void HaBroker::recover() {
- boost::shared_ptr<Backup> b;
- BrokerInfo::Set backups;
- {
- Mutex::ScopedLock l(lock);
- if (isPrimary(status)) {
- QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo);
- return;
- }
- QPID_LOG(notice, "Promoting to primary: " << brokerInfo);
- // Reset membership before allowing backups to connect.
- backups = membership.otherBackups();
- membership.reset(brokerInfo);
- // No longer replicating, close link. Note: link must be closed before we
- // setStatus(RECOVERING) as that will remove our broker info from the
- // outgoing link properties so we won't recognize self-connects.
- b = backup;
- backup.reset(); // Reset in lock.
- }
- b.reset(); // Call destructor outside of lock.
- {
- Mutex::ScopedLock l(lock);
- setStatus(RECOVERING, l);
- // Drop the lock, new Primary may call back on activate.
- }
- // Outside of lock, may call back on activate()
- primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- switch (getStatus()) {
- case JOINING:
- if (statusCheck->canPromote())
- recover();
- else {
- QPID_LOG(error,
- logPrefix << "Joining active cluster, cannot be promoted.");
- throw Exception("Cluster already active, cannot be promoted.");
- }
- break;
- case CATCHUP:
- QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
- throw Exception("Still catching up, cannot be promoted.");
- break;
- case READY: recover(); break;
- case RECOVERING: break;
- case ACTIVE: break;
- case STANDALONE: break;
- }
- break;
+ Role* r = role->promote();
+ if (r) role.reset(r);
+ break;
}
case _qmf::HaBroker::METHOD_SETBROKERSURL: {
setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
@@ -200,10 +145,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
- boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
Uuid uuid(true);
@@ -213,10 +158,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
- boost::shared_ptr<broker::Link> link = result.first;
+ shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(
+ shared_ptr<QueueReplicator> qr(
new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
@@ -235,20 +180,17 @@ void HaBroker::setPublicUrl(const Url& url) {
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
+ QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
- boost::shared_ptr<Backup> b;
{
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
- if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
- b = backup;
+ QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
}
- if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
+ role->setBrokerUrl(url); // Oustside lock
}
std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -263,110 +205,7 @@ void HaBroker::shutdown(const std::string& message) {
}
BrokerStatus HaBroker::getStatus() const {
- return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
- Mutex::ScopedLock l(lock);
- setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
- // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
- static const BrokerStatus TRANSITIONS[][2] = {
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
- { READY, RECOVERING }, // Chosen as new primary
- { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
- { RECOVERING, ACTIVE } // All expected backups are ready
- };
- static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
- for (size_t i = 0; i < N; ++i) {
- if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
- return true;
- }
- return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix << "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
- assert(legal);
- if (!legal) {
- shutdown(QPID_MSG(logPrefix << "Illegal state transition: "
- << printable(status) << " -> " << printable(newStatus)));
- }
- status = newStatus;
- statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status).str());
- brokerInfo.setStatus(status);
- membership.add(brokerInfo);
- membershipUpdated(l);
- setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix << "Membership: " << membership);
- Variant::List brokers = membership.asList();
- mgmtObject->set_members(brokers);
- broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
- boost::shared_ptr<Backup> b;
- {
- Mutex::ScopedLock l(lock);
- membership.assign(brokers);
- BrokerInfo info;
- // Update my status to what the primary says it is. The primary sets
- // status to READY when we are caught up, and sets status to CATCHUP
- // (from READY) if we are timed out during recovery.
- if (membership.get(systemId, info) && status != info.getStatus()) {
- assert((status == CATCHUP && info.getStatus() == READY) ||
- (status == READY && info.getStatus() == CATCHUP));
- setStatus(info.getStatus(), l);
- b = backup;
- }
- membershipUpdated(l);
- }
- if (b) b->setStatus(status); // Oustside lock, avoid deadlock
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
- Mutex::ScopedLock l(lock);
- BrokerInfo info;
- if (membership.get(id, info)) {
- membership.remove(id);
- membershipUpdated(l);
- }
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
- framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status)) {
- // If this is a backup then any outgoing links are backup
- // links and need to be tagged.
- linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
- }
- else {
- // If this is a primary then any outgoing links are federation links
- // and should not be tagged.
- linkProperties.erase(ConnectionObserver::BACKUP_TAG);
- }
- broker.setLinkClientProperties(linkProperties);
+ return membership.getStatus();
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 71e856fbeb..7ba023129c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -53,12 +53,15 @@ namespace ha {
class Backup;
class ConnectionObserver;
class Primary;
-class StatusCheck;
-
+class Role;
/**
* HA state and actions associated with a HA broker. Holds all the management info.
*
* THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
*/
class HaBroker : public management::Manageable
{
@@ -82,53 +85,37 @@ class HaBroker : public management::Manageable
void shutdown(const std::string& message);
BrokerStatus getStatus() const;
- void setStatus(BrokerStatus);
- void activate();
-
- Backup* getBackup() { return backup.get(); }
ReplicationTest getReplicationTest() const { return replicationTest; }
-
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
- const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
- void setMembership(const types::Variant::List&); // Set membership from list.
- void addBroker(const BrokerInfo& b); // Add a broker to the membership.
- void removeBroker(const types::Uuid& id); // Remove a broker from membership.
-
+ BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+ Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
private:
+
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
- void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
- void recover();
- void statusChanged(sys::Mutex::ScopedLock&);
- void setLinkProperties(sys::Mutex::ScopedLock&);
-
std::vector<Url> getKnownBrokers() const;
- void membershipUpdated(sys::Mutex::ScopedLock&);
-
- broker::Broker& broker;
- types::Uuid systemId;
+ // Immutable members
+ const types::Uuid systemId;
const Settings settings;
+ // Member variables protected by lock
mutable sys::Mutex lock;
- boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
- boost::shared_ptr<Backup> backup;
- boost::shared_ptr<Primary> primary;
- qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- BrokerStatus status;
- std::string logPrefix;
- BrokerInfo brokerInfo;
- Membership membership;
ReplicationTest replicationTest;
- std::auto_ptr<StatusCheck> statusCheck;
+
+ // Independently thread-safe member variables
+ broker::Broker& broker;
+ qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+ boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+ boost::shared_ptr<Role> role;
+ Membership membership;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 74580f9b1e..d33d57c37f 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -19,6 +19,12 @@
*
*/
#include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iterator>
@@ -26,37 +32,57 @@
namespace qpid {
namespace ha {
+namespace _qmf = ::qmf::org::apache::qpid::ha;
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+}
+
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
brokers.clear();
- brokers[b.getSystemId()] = b;
+ brokers[self] = me;
}
void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- }
+ update(l);
+ }
}
bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
+ Mutex::ScopedLock l(lock);
brokers.clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
+ update(l);
}
types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -64,6 +90,7 @@ types::Variant::List Membership::asList() const {
}
BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups() const {
return result;
}
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
return true;
}
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
- return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+ QPID_LOG(info, "Membership: " << brokers);
+ Variant::List brokers = asList();
+ if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+ if (mgmtObject) mgmtObject->set_members(brokers);
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ BrokerStatus status = getStatus();
+ QPID_LOG(info, "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ if (!legal) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+ << " -> " << printable(newStatus)));
+ }
+
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+ update(l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
}
+// FIXME aconway 2013-01-23: move to .h?
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 8406dccd5d..956569fbd8 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -24,45 +24,72 @@
#include "BrokerInfo.h"
#include "types.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/types/Variant.h"
#include <boost/function.hpp>
#include <set>
#include <vector>
#include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
namespace ha {
+class HaBroker;
/**
* Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
*/
class Membership
{
public:
- Membership(const types::Uuid& self_) : self(self_) {}
+ Membership(const BrokerInfo& info, HaBroker&);
- void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+ void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
+
+ void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
- bool get(const types::Uuid& id, BrokerInfo& result);
+ bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+ types::Uuid getSelf() const { return self; }
+ BrokerInfo getInfo() const;
+ BrokerStatus getStatus() const;
+ void setStatus(BrokerStatus s);
private:
- types::Uuid self;
+ void update(sys::Mutex::ScopedLock&);
+ BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+ mutable sys::Mutex lock;
+ HaBroker& haBroker;
+ boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+ const types::Uuid self;
BrokerInfo::Map brokers;
- friend std::ostream& operator<<(std::ostream&, const Membership&);
};
-std::ostream& operator<<(std::ostream&, const Membership&);
-
}} // namespace qpid::ha
#endif /*!QPID_HA_MEMBERSHIP_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 259b043bef..12535399e3 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("Primary: "), active(false)
+ haBroker(hb), membership(hb.getMembership()),
+ logPrefix("Primary: "), active(false)
{
+ hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
hb.getBroker().getTimer().add(timerTask);
}
+
+ // Remove backup tag property from outgoing link properties.
+ framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ hb.getBroker().setLinkClientProperties(linkProperties);
+
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
checkReady(l);
+
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
@@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) {
active = true;
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- haBroker.activate();
+ membership.setStatus(ACTIVE);
}
}
@@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.addBroker(info);
+ membership.add(info);
if (expectedBackups.erase(i->second)) {
QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
checkReady(l);
@@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() {
// Downgrade the broker's status to CATCHUP
// The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else ++i;
}
@@ -243,7 +252,7 @@ void Primary::opened(broker::Connection& connection) {
checkReady(i, l);
}
if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -260,7 +269,7 @@ void Primary::closed(broker::Connection& connection) {
// Checking isConnected() lets us ignore such spurious closes.
if (i != backups.end() && i->second->isConnected()) {
QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
- haBroker.removeBroker(info.getSystemId());
+ membership.remove(info.getSystemId());
expectedBackups.erase(i->second);
backups.erase(i);
checkReady(l);
@@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
+Role* Primary::promote() {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ return 0;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 08e92d5673..3097695817 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "BrokerInfo.h"
+#include "Role.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -48,6 +49,7 @@ class HaBroker;
class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
+class Membership;
/**
* State associated with a primary broker:
@@ -56,7 +58,7 @@ class QueueGuard;
*
* THREAD SAFE: called concurrently in arbitrary connection threads.
*/
-class Primary
+class Primary : public Role
{
public:
typedef boost::shared_ptr<broker::Queue> QueuePtr;
@@ -67,6 +69,11 @@ class Primary
Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
~Primary();
+ // Role implementation
+ std::string getLogPrefix() const { return logPrefix; }
+ Role* promote();
+ void setBrokerUrl(const Url&) {}
+
void readyReplica(const ReplicatingSubscription&);
void removeReplica(const std::string& q);
@@ -94,6 +101,7 @@ class Primary
sys::Mutex lock;
HaBroker& haBroker;
+ Membership& membership;
std::string logPrefix;
bool active;
/**
diff --git a/qpid/cpp/src/qpid/ha/Role.h b/qpid/cpp/src/qpid/ha/Role.h
new file mode 100644
index 0000000000..570c65e3e7
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Role.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_ROLE_H
+#define QPID_HA_ROLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+class Url;
+
+namespace ha {
+
+/**
+ * A HaBroker has a role, e.g. Primary, Backup, StandAlone.
+ * Role subclasses define the actions of the broker in each role.
+ * The Role interface allows the HaBroker to pass management actions
+ * to be implemented by the role.
+ */
+class Role
+{
+ public:
+ /** Log prefix appropriate to the role */
+ virtual std::string getLogPrefix() const = 0;
+
+ /** QMF promote method handler.
+ * @return The new role if promoted, 0 if not. Caller takes ownership.
+ */
+ virtual Role* promote() = 0;
+
+ virtual void setBrokerUrl(const Url& url) = 0;
+
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ROLE_H*/
diff --git a/qpid/cpp/src/qpid/ha/StandAlone.h b/qpid/cpp/src/qpid/ha/StandAlone.h
new file mode 100644
index 0000000000..4bfd1810f2
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/StandAlone.h
@@ -0,0 +1,45 @@
+#ifndef QPID_HA_STANDALONE_H
+#define QPID_HA_STANDALONE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+class Url;
+
+namespace ha {
+
+/**
+ * Stand-alone role: acts as a stand-alone broker, no clustering.
+ * HA module needed to setting up replication via QMF methods.
+ */
+class StandAlone : public Role
+{
+ public:
+ std::string getLogPrefix() const { return logPrefix; }
+ Role* promote() { return 0; }
+ void setBrokerUrl(const Url&) {}
+
+ private:
+ std::string logPrefix;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_STANDALONE_H*/