diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 100 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerInfo.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerInfo.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 43 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 227 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 49 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 110 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.h | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 26 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Role.h | 55 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StandAlone.h | 45 |
14 files changed, 442 insertions, 298 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 6317520a56..2aabf6342b 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -20,9 +20,12 @@ */ #include "Backup.h" #include "BrokerReplicator.h" +#include "ConnectionObserver.h" #include "HaBroker.h" +#include "Primary.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "StatusCheck.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -44,28 +47,38 @@ using namespace framing; using namespace broker; using types::Variant; using std::string; +using sys::Mutex; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) + logPrefix("Backup: "), membership(hb.getMembership()), stopped(false), + haBroker(hb), broker(hb.getBroker()), settings(s), + statusCheck( + new StatusCheck( + logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo())) { - // Empty brokerUrl means delay initialization until seBrokertUrl() is called. - if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); + // Set link properties to tag outgoing links. + framing::FieldTable linkProperties = broker.getLinkClientProperties(); + linkProperties.setTable( + ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable()); + broker.setLinkClientProperties(linkProperties); } -void Backup::initialize(const Url& brokers) { - if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; - types::Uuid uuid(true); - // Declare the link - std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( - broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - brokers[0].host, brokers[0].port, protocol, - false, // durable - settings.mechanism, settings.username, settings.password, - false); // no amq.failover - don't want to use client URL. - { - sys::Mutex::ScopedLock l(lock); +void Backup::setBrokerUrl(const Url& brokers) { + if (brokers.empty()) return; + Mutex::ScopedLock l(lock); + if (stopped) return; + if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers); + if (!link) { // Not yet initialized + QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; + types::Uuid uuid(true); + std::pair<Link::shared_ptr, bool> result; + result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), + brokers[0].host, brokers[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password, + false); // no amq.failover - don't want to use client URL. link = result.first; replicator.reset(new BrokerReplicator(haBroker, link)); replicator->initialize(); @@ -74,8 +87,9 @@ void Backup::initialize(const Url& brokers) { link->setUrl(brokers); // Outside the lock, once set link doesn't change. } -Backup::~Backup() { - QPID_LOG(debug, logPrefix << "No longer a backup."); +void Backup::stop(Mutex::ScopedLock&) { + if (stopped) return; + QPID_LOG(debug, logPrefix << "Leaving backup role."); if (link) link->close(); if (replicator.get()) { broker.getExchanges().destroy(replicator->getName()); @@ -84,33 +98,45 @@ Backup::~Backup() { } } -// Called via management. -void Backup::setBrokerUrl(const Url& url) { - // Ignore empty URLs seen during start-up for some tests. - if (url.empty()) return; - bool linkSet = false; +Role* Backup::recover(Mutex::ScopedLock&) { + BrokerInfo::Set backups; { - sys::Mutex::ScopedLock l(lock); - linkSet = link; + Mutex::ScopedLock l(lock); + if (stopped) return 0; + stop(l); // Stop backup activity before starting primary. + QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo()); + // Reset membership before allowing backups to connect. + backups = membership.otherBackups(); + membership.clear(); + return new Primary(haBroker, backups); } - if (linkSet) - link->setUrl(url); // Outside lock, once set link doesn't change - else - initialize(url); // Deferred initialization } -void Backup::setStatus(BrokerStatus status) { - switch (status) { - case READY: - QPID_LOG(notice, logPrefix << "Ready to become primary."); +Role* Backup::promote() { + Mutex::ScopedLock l(lock); + if (stopped) return 0; + switch (haBroker.getStatus()) { + case JOINING: + if (statusCheck->canPromote()) return recover(l); + else { + QPID_LOG(error, + logPrefix << "Joining active cluster, cannot be promoted."); + throw Exception("Joining active cluster, cannot be promoted."); + } break; case CATCHUP: - QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted."); + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + throw Exception("Still catching up, cannot be promoted."); break; + case READY: return recover(l); break; default: - // FIXME aconway 2012-12-07: fail - assert(0); + assert(0); // Not a valid state for the Backup role.. } } +Backup::~Backup() { + Mutex::ScopedLock l(lock); + stop(l); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 4f2d5babde..4943ca5e2e 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -22,6 +22,7 @@ * */ +#include "Role.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -38,30 +39,41 @@ namespace ha { class Settings; class BrokerReplicator; class HaBroker; +class StatusCheck; +class Membership; /** - * State associated with a backup broker. Manages connections to primary. + * Backup role: Manages connections to primary, replicates management events and queue contents. * * THREAD SAFE */ -class Backup +class Backup : public Role { public: Backup(HaBroker&, const Settings&); ~Backup(); + + std::string getLogPrefix() const { return logPrefix; } + void setBrokerUrl(const Url&); - void setStatus(BrokerStatus); + + Role* promote(); private: - void initialize(const Url&); + void stop(sys::Mutex::ScopedLock&); + Role* recover(sys::Mutex::ScopedLock&); + std::string logPrefix; + Membership& membership; sys::Mutex lock; + bool stopped; HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; boost::shared_ptr<BrokerReplicator> replicator; + std::auto_ptr<StatusCheck> statusCheck; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 5a8dfa512a..a0c4af88ca 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -45,8 +45,9 @@ using framing::FieldTable; BrokerInfo::BrokerInfo() : port(0), status(JOINING) {} -BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : - hostName(host), port(port_), systemId(id), status(JOINING) +BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s, + const std::string& host, uint16_t port_) : + hostName(host), port(port_), systemId(id), status(s) { updateLogId(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index 5131e23be2..6142e03f98 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -44,7 +44,8 @@ class BrokerInfo typedef std::map<types::Uuid, BrokerInfo> Map; BrokerInfo(); - BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id); + BrokerInfo(const types::Uuid& id, BrokerStatus, + const std::string& host=std::string(), uint16_t port=0); BrokerInfo(const framing::FieldTable& ft) { assign(ft); } BrokerInfo(const types::Variant::Map& m) { assign(m); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index f51a69e461..37c2a2d6b4 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -227,7 +227,9 @@ class BrokerReplicator::UpdateTracker { typedef std::set<std::string> Names; typedef boost::function<void (const std::string&)> CleanFn; - UpdateTracker(CleanFn f, const ReplicationTest& rt) : cleanFn(f), repTest(rt) {} + UpdateTracker(const std::string& type_, // "queue" or "exchange" + CleanFn f, const ReplicationTest& rt) + : type(type_), cleanFn(f), repTest(rt) {} /** Destructor cleans up remaining initial queues. */ ~UpdateTracker() { @@ -264,6 +266,12 @@ class BrokerReplicator::UpdateTracker { } private: + void clean(const std::string& name) { + QPID_LOG(info, "Backup updated, deleting " << type << " " << name); + cleanFn(name); + } + + std::string type; Names initial, events; CleanFn cleanFn; ReplicationTest repTest; @@ -353,13 +361,15 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH initialized = true; exchangeTracker.reset( - new UpdateTracker(boost::bind(&BrokerReplicator::deleteExchange, this, _1), + new UpdateTracker("exchange", + boost::bind(&BrokerReplicator::deleteExchange, this, _1), replicationTest)); exchanges.eachExchange( boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); queueTracker.reset( - new UpdateTracker(boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), + new UpdateTracker("queue", + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), replicationTest)); queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); @@ -394,7 +404,7 @@ void BrokerReplicator::route(Deliverable& msg) { // We transition from JOINING->CATCHUP on the first message received from the primary. // Until now we couldn't be sure if we had a good connection to the primary. if (haBroker.getStatus() == JOINING) { - haBroker.setStatus(CATCHUP); + haBroker.getMembership().setStatus(CATCHUP); QPID_LOG(notice, logPrefix << "Connected to primary " << primary); } Variant::List list; @@ -572,7 +582,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { Variant::List members = values[MEMBERS].asList(); - haBroker.setMembership(members); + setMembership(members); } void BrokerReplicator::doEventSubscribe(Variant::Map& values) { @@ -724,7 +734,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine << ") does not match primary (" << primary << ")")); - haBroker.setMembership(values[MEMBERS].asList()); + setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { haBroker.shutdown( QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what() @@ -861,4 +871,25 @@ void BrokerReplicator::disconnected() { boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); } +void BrokerReplicator::setMembership(const Variant::List& brokers) { + Membership& membership(haBroker.getMembership()); + membership.assign(brokers); + // Check if the primary has signalled a change in my status: + // from CATCHUP to READY when we are caught up. + // from READY TO CATCHUP if we are timed out during fail-over. + BrokerInfo info; + if (membership.get(membership.getSelf(), info)) { + BrokerStatus oldStatus = haBroker.getStatus(); + BrokerStatus newStatus = info.getStatus(); + if (oldStatus == CATCHUP && newStatus == READY) { + QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready"); + haBroker.getMembership().setStatus(READY); + } + else if (oldStatus == READY && newStatus == CATCHUP) { + QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up"); + haBroker.getMembership().setStatus(CATCHUP); + } + } +} + }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 9134163575..9161227c0f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -136,6 +136,8 @@ class BrokerReplicator : public broker::Exchange, void autoDeleteCheck(boost::shared_ptr<broker::Exchange>); void disconnected(); + void setMembership(const types::Variant::List&); // Set membership from list. + std::string logPrefix; std::string userId, remoteHost; ReplicationTest replicationTest; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 0fd8c9e518..c4cb640f97 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -26,7 +26,7 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Settings.h" -#include "StatusCheck.h" +#include "StandAlone.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" @@ -42,7 +42,6 @@ #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h" -#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> @@ -56,24 +55,23 @@ using types::Variant; using types::Uuid; using sys::Mutex; using boost::shared_ptr; +using boost::dynamic_pointer_cast; // Called in Plugin::earlyInitialize HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : broker(b), - systemId(broker.getSystem()->getSystemId().data()), + : systemId(b.getSystem()->getSystemId().data()), settings(s), + replicationTest(s.replicateDefault.get()), + broker(b), observer(new ConnectionObserver(*this, systemId)), - status(STANDALONE), - logPrefix("Broker: "), - membership(systemId), - replicationTest(s.replicateDefault.get()) + role(new StandAlone), + membership(BrokerInfo(systemId, STANDALONE), *this) { // If we are joining a cluster we must start excluding clients now, // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { - status = JOINING; - QPID_LOG(debug, logPrefix << "Rejecting client connections."); + QPID_LOG(debug, role->getLogPrefix() << "Rejecting client connections."); shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); @@ -87,13 +85,16 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; } // Called in Plugin::initialize void HaBroker::initialize() { - // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. - brokerInfo = BrokerInfo( - broker.getSystem()->getNodeName(), - broker.getPort(broker::Broker::TCP_TRANSPORT), - systemId); - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); + membership.add( + BrokerInfo( + membership.getSelf(), + settings.cluster ? JOINING : membership.getStatus(), + broker.getSystem()->getNodeName(), + broker.getPort(broker::Broker::TCP_TRANSPORT) + ) + ); + QPID_LOG(notice, role->getLogPrefix() << "Initializing: " << membership.getInfo()); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -104,90 +105,34 @@ void HaBroker::initialize() { mgmtObject->set_replicateDefault(settings.replicateDefault.str()); mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); + membership.setMgmtObject(mgmtObject); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( - boost::shared_ptr<ReplicatingSubscription::Factory>( + shared_ptr<ReplicatingSubscription::Factory>( new ReplicatingSubscription::Factory())); // If we are in a cluster, start as backup in joining state. if (settings.cluster) { - status = JOINING; - backup.reset(new Backup(*this, settings)); + assert(membership.getStatus() == JOINING); + role.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); - statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo)); if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl)); if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl)); } - - - // NOTE: lock is not needed in a constructor, but create one - // to pass to functions that have a ScopedLock parameter. - Mutex::ScopedLock l(lock); - statusChanged(l); } HaBroker::~HaBroker() { - QPID_LOG(notice, logPrefix << "Shut down"); + QPID_LOG(notice, role->getLogPrefix() << "Shut down"); broker.getConnectionObservers().remove(observer); } -// Called from ManagementMethod on promote. -void HaBroker::recover() { - boost::shared_ptr<Backup> b; - BrokerInfo::Set backups; - { - Mutex::ScopedLock l(lock); - if (isPrimary(status)) { - QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo); - return; - } - QPID_LOG(notice, "Promoting to primary: " << brokerInfo); - // Reset membership before allowing backups to connect. - backups = membership.otherBackups(); - membership.reset(brokerInfo); - // No longer replicating, close link. Note: link must be closed before we - // setStatus(RECOVERING) as that will remove our broker info from the - // outgoing link properties so we won't recognize self-connects. - b = backup; - backup.reset(); // Reset in lock. - } - b.reset(); // Call destructor outside of lock. - { - Mutex::ScopedLock l(lock); - setStatus(RECOVERING, l); - // Drop the lock, new Primary may call back on activate. - } - // Outside of lock, may call back on activate() - primary.reset(new Primary(*this, backups)); // Starts primary-ready check. -} - -// Called back from Primary active check. -void HaBroker::activate() { setStatus(ACTIVE); } - Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { - switch (getStatus()) { - case JOINING: - if (statusCheck->canPromote()) - recover(); - else { - QPID_LOG(error, - logPrefix << "Joining active cluster, cannot be promoted."); - throw Exception("Cluster already active, cannot be promoted."); - } - break; - case CATCHUP: - QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); - throw Exception("Still catching up, cannot be promoted."); - break; - case READY: recover(); break; - case RECOVERING: break; - case ACTIVE: break; - case STANDALONE: break; - } - break; + Role* r = role->promote(); + if (r) role.reset(r); + break; } case _qmf::HaBroker::METHOD_SETBROKERSURL: { setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url)); @@ -200,10 +145,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, logPrefix << "Replicate individual queue " + QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); - boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; Uuid uuid(true); @@ -213,10 +158,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, false, // durable settings.mechanism, settings.username, settings.password, false); // no amq.failover - don't want to use client URL. - boost::shared_ptr<broker::Link> link = result.first; + shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr( + shared_ptr<QueueReplicator> qr( new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); @@ -235,20 +180,17 @@ void HaBroker::setPublicUrl(const Url& url) { mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, logPrefix << "Setting public URL to: " << url); + QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { - boost::shared_ptr<Backup> b; { Mutex::ScopedLock l(lock); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, logPrefix << "Brokers URL set to: " << url); - if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url); - b = backup; + QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url); } - if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock + role->setBrokerUrl(url); // Oustside lock } std::vector<Url> HaBroker::getKnownBrokers() const { @@ -263,110 +205,7 @@ void HaBroker::shutdown(const std::string& message) { } BrokerStatus HaBroker::getStatus() const { - return status; -} - -void HaBroker::setStatus(BrokerStatus newStatus) { - Mutex::ScopedLock l(lock); - setStatus(newStatus, l); -} - -namespace { -bool checkTransition(BrokerStatus from, BrokerStatus to) { - // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. - static const BrokerStatus TRANSITIONS[][2] = { - { JOINING, CATCHUP }, // Connected to primary - { JOINING, RECOVERING }, // Chosen as initial primary. - { CATCHUP, READY }, // Caught up all queues, ready to take over. - { READY, RECOVERING }, // Chosen as new primary - { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. - { RECOVERING, ACTIVE } // All expected backups are ready - }; - static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); - for (size_t i = 0; i < N; ++i) { - if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) - return true; - } - return false; -} -} // namespace - -void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { - QPID_LOG(info, logPrefix << "Status change: " - << printable(status) << " -> " << printable(newStatus)); - bool legal = checkTransition(status, newStatus); - assert(legal); - if (!legal) { - shutdown(QPID_MSG(logPrefix << "Illegal state transition: " - << printable(status) << " -> " << printable(newStatus))); - } - status = newStatus; - statusChanged(l); -} - -void HaBroker::statusChanged(Mutex::ScopedLock& l) { - mgmtObject->set_status(printable(status).str()); - brokerInfo.setStatus(status); - membership.add(brokerInfo); - membershipUpdated(l); - setLinkProperties(l); -} - -void HaBroker::membershipUpdated(Mutex::ScopedLock&) { - QPID_LOG(info, logPrefix << "Membership: " << membership); - Variant::List brokers = membership.asList(); - mgmtObject->set_members(brokers); - broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); -} - -void HaBroker::setMembership(const Variant::List& brokers) { - boost::shared_ptr<Backup> b; - { - Mutex::ScopedLock l(lock); - membership.assign(brokers); - BrokerInfo info; - // Update my status to what the primary says it is. The primary sets - // status to READY when we are caught up, and sets status to CATCHUP - // (from READY) if we are timed out during recovery. - if (membership.get(systemId, info) && status != info.getStatus()) { - assert((status == CATCHUP && info.getStatus() == READY) || - (status == READY && info.getStatus() == CATCHUP)); - setStatus(info.getStatus(), l); - b = backup; - } - membershipUpdated(l); - } - if (b) b->setStatus(status); // Oustside lock, avoid deadlock -} - -void HaBroker::addBroker(const BrokerInfo& b) { - Mutex::ScopedLock l(lock); - membership.add(b); - membershipUpdated(l); -} - -void HaBroker::removeBroker(const Uuid& id) { - Mutex::ScopedLock l(lock); - BrokerInfo info; - if (membership.get(id, info)) { - membership.remove(id); - membershipUpdated(l); - } -} - -void HaBroker::setLinkProperties(Mutex::ScopedLock&) { - framing::FieldTable linkProperties = broker.getLinkClientProperties(); - if (isBackup(status)) { - // If this is a backup then any outgoing links are backup - // links and need to be tagged. - linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); - } - else { - // If this is a primary then any outgoing links are federation links - // and should not be tagged. - linkProperties.erase(ConnectionObserver::BACKUP_TAG); - } - broker.setLinkClientProperties(linkProperties); + return membership.getStatus(); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 71e856fbeb..7ba023129c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -53,12 +53,15 @@ namespace ha { class Backup; class ConnectionObserver; class Primary; -class StatusCheck; - +class Role; /** * HA state and actions associated with a HA broker. Holds all the management info. * * THREAD SAFE: may be called in arbitrary broker IO or timer threads. + + * NOTE: HaBroker and Role subclasses follow this lock hierarchy: + * - HaBroker MUST NOT hold its own lock across calls Role subclasses. + * - Role subclasses MAY hold their locks accross calls to HaBroker. */ class HaBroker : public management::Manageable { @@ -82,53 +85,37 @@ class HaBroker : public management::Manageable void shutdown(const std::string& message); BrokerStatus getStatus() const; - void setStatus(BrokerStatus); - void activate(); - - Backup* getBackup() { return backup.get(); } ReplicationTest getReplicationTest() const { return replicationTest; } - boost::shared_ptr<ConnectionObserver> getObserver() { return observer; } - const BrokerInfo& getBrokerInfo() const { return brokerInfo; } - - void setMembership(const types::Variant::List&); // Set membership from list. - void addBroker(const BrokerInfo& b); // Add a broker to the membership. - void removeBroker(const types::Uuid& id); // Remove a broker from membership. - + BrokerInfo getBrokerInfo() const { return membership.getInfo(); } + Membership& getMembership() { return membership; } types::Uuid getSystemId() const { return systemId; } private: + void setPublicUrl(const Url&); void setBrokerUrl(const Url&); void updateClientUrl(sys::Mutex::ScopedLock&); - void setStatus(BrokerStatus, sys::Mutex::ScopedLock&); - void recover(); - void statusChanged(sys::Mutex::ScopedLock&); - void setLinkProperties(sys::Mutex::ScopedLock&); - std::vector<Url> getKnownBrokers() const; - void membershipUpdated(sys::Mutex::ScopedLock&); - - broker::Broker& broker; - types::Uuid systemId; + // Immutable members + const types::Uuid systemId; const Settings settings; + // Member variables protected by lock mutable sys::Mutex lock; - boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary - boost::shared_ptr<Backup> backup; - boost::shared_ptr<Primary> primary; - qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; - BrokerStatus status; - std::string logPrefix; - BrokerInfo brokerInfo; - Membership membership; ReplicationTest replicationTest; - std::auto_ptr<StatusCheck> statusCheck; + + // Independently thread-safe member variables + broker::Broker& broker; + qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; + boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary + boost::shared_ptr<Role> role; + Membership membership; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 74580f9b1e..d33d57c37f 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -19,6 +19,12 @@ * */ #include "Membership.h" +#include "HaBroker.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" #include <boost/bind.hpp> #include <iostream> #include <iterator> @@ -26,37 +32,57 @@ namespace qpid { namespace ha { +namespace _qmf = ::qmf::org::apache::qpid::ha; -void Membership::reset(const BrokerInfo& b) { +using sys::Mutex; +using types::Variant; + +Membership::Membership(const BrokerInfo& info, HaBroker& b) + : haBroker(b), self(info.getSystemId()) +{ + brokers[self] = info; +} + +void Membership::clear() { + Mutex::ScopedLock l(lock); + BrokerInfo me = brokers[self]; brokers.clear(); - brokers[b.getSystemId()] = b; + brokers[self] = me; } void Membership::add(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; + update(l); } void Membership::remove(const types::Uuid& id) { + Mutex::ScopedLock l(lock); BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - } + update(l); + } } bool Membership::contains(const types::Uuid& id) { + Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { + Mutex::ScopedLock l(lock); brokers.clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } + update(l); } types::Variant::List Membership::asList() const { + Mutex::ScopedLock l(lock); types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); @@ -64,6 +90,7 @@ types::Variant::List Membership::asList() const { } BrokerInfo::Set Membership::otherBackups() const { + Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) @@ -71,15 +98,84 @@ BrokerInfo::Set Membership::otherBackups() const { return result; } -bool Membership::get(const types::Uuid& id, BrokerInfo& result) { - BrokerInfo::Map::iterator i = brokers.find(id); +bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } -std::ostream& operator<<(std::ostream& o, const Membership& members) { - return o << members.brokers; +void Membership::update(Mutex::ScopedLock& l) { + QPID_LOG(info, "Membership: " << brokers); + Variant::List brokers = asList(); + if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); + if (mgmtObject) mgmtObject->set_members(brokers); + haBroker.getBroker().getManagementAgent()->raiseEvent( + _qmf::EventMembersUpdate(brokers)); +} + +void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { + Mutex::ScopedLock l(lock); + mgmtObject = mo; + update(l); +} + + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void Membership::setStatus(BrokerStatus newStatus) { + BrokerStatus status = getStatus(); + QPID_LOG(info, "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + if (!legal) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) + << " -> " << printable(newStatus))); + } + + Mutex::ScopedLock l(lock); + brokers[self].setStatus(newStatus); + if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); + update(l); +} + +BrokerStatus Membership::getStatus() const { + Mutex::ScopedLock l(lock); + return getStatus(l); +} + +BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second.getStatus(); +} + +BrokerInfo Membership::getInfo() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second; } +// FIXME aconway 2013-01-23: move to .h? }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 8406dccd5d..956569fbd8 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -24,45 +24,72 @@ #include "BrokerInfo.h" #include "types.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Mutex.h" #include "qpid/types/Variant.h" #include <boost/function.hpp> #include <set> #include <vector> #include <iosfwd> + +namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha { +class HaBroker; +}}}}} + namespace qpid { + +namespace broker { +class Broker; +} + +namespace types { +class Uuid; +} + namespace ha { +class HaBroker; /** * Keep track of the brokers in the membership. - * THREAD UNSAFE: caller must serialize + * Send management when events on membership changes. + * THREAD SAFE */ class Membership { public: - Membership(const types::Uuid& self_) : self(self_) {} + Membership(const BrokerInfo& info, HaBroker&); - void reset(const BrokerInfo& b); ///< Reset to contain just one member. + void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>); + + void clear(); ///< Clear all but self. void add(const BrokerInfo& b); void remove(const types::Uuid& id); bool contains(const types::Uuid& id); + /** Return IDs of all READY backups other than self */ BrokerInfo::Set otherBackups() const; void assign(const types::Variant::List&); types::Variant::List asList() const; - bool get(const types::Uuid& id, BrokerInfo& result); + bool get(const types::Uuid& id, BrokerInfo& result) const; + + types::Uuid getSelf() const { return self; } + BrokerInfo getInfo() const; + BrokerStatus getStatus() const; + void setStatus(BrokerStatus s); private: - types::Uuid self; + void update(sys::Mutex::ScopedLock&); + BrokerStatus getStatus(sys::Mutex::ScopedLock&) const; + + mutable sys::Mutex lock; + HaBroker& haBroker; + boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject; + const types::Uuid self; BrokerInfo::Map brokers; - friend std::ostream& operator<<(std::ostream&, const Membership&); }; -std::ostream& operator<<(std::ostream&, const Membership&); - }} // namespace qpid::ha #endif /*!QPID_HA_MEMBERSHIP_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 259b043bef..12535399e3 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -82,8 +82,10 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("Primary: "), active(false) + haBroker(hb), membership(hb.getMembership()), + logPrefix("Primary: "), active(false) { + hb.getMembership().setStatus(RECOVERING); assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { @@ -108,11 +110,18 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getBroker().getTimer().add(timerTask); } + + // Remove backup tag property from outgoing link properties. + framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties(); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + hb.getBroker().setLinkClientProperties(linkProperties); + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); haBroker.getBroker().getConfigurationObservers().add(configurationObserver); Mutex::ScopedLock l(lock); // We are now active as a configurationObserver checkReady(l); + // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); haBroker.getObserver()->setObserver(connectionObserver, logPrefix); @@ -128,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { active = true; Mutex::ScopedUnlock u(lock); // Don't hold lock across callback QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); - haBroker.activate(); + membership.setStatus(ACTIVE); } } @@ -136,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.addBroker(info); + membership.add(info); if (expectedBackups.erase(i->second)) { QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); checkReady(l); @@ -164,7 +173,7 @@ void Primary::timeoutExpectedBackups() { // Downgrade the broker's status to CATCHUP // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else ++i; } @@ -243,7 +252,7 @@ void Primary::opened(broker::Connection& connection) { checkReady(i, l); } if (info.getStatus() == JOINING) info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else QPID_LOG(debug, logPrefix << "Accepted client connection " @@ -260,7 +269,7 @@ void Primary::closed(broker::Connection& connection) { // Checking isConnected() lets us ignore such spurious closes. if (i != backups.end() && i->second->isConnected()) { QPID_LOG(info, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + membership.remove(info.getSystemId()); expectedBackups.erase(i->second); backups.erase(i); checkReady(l); @@ -276,4 +285,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); } +Role* Primary::promote() { + QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + return 0; +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 08e92d5673..3097695817 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -24,6 +24,7 @@ #include "types.h" #include "BrokerInfo.h" +#include "Role.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> @@ -48,6 +49,7 @@ class HaBroker; class ReplicatingSubscription; class RemoteBackup; class QueueGuard; +class Membership; /** * State associated with a primary broker: @@ -56,7 +58,7 @@ class QueueGuard; * * THREAD SAFE: called concurrently in arbitrary connection threads. */ -class Primary +class Primary : public Role { public: typedef boost::shared_ptr<broker::Queue> QueuePtr; @@ -67,6 +69,11 @@ class Primary Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups); ~Primary(); + // Role implementation + std::string getLogPrefix() const { return logPrefix; } + Role* promote(); + void setBrokerUrl(const Url&) {} + void readyReplica(const ReplicatingSubscription&); void removeReplica(const std::string& q); @@ -94,6 +101,7 @@ class Primary sys::Mutex lock; HaBroker& haBroker; + Membership& membership; std::string logPrefix; bool active; /** diff --git a/qpid/cpp/src/qpid/ha/Role.h b/qpid/cpp/src/qpid/ha/Role.h new file mode 100644 index 0000000000..570c65e3e7 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Role.h @@ -0,0 +1,55 @@ +#ifndef QPID_HA_ROLE_H +#define QPID_HA_ROLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> + +namespace qpid { +class Url; + +namespace ha { + +/** + * A HaBroker has a role, e.g. Primary, Backup, StandAlone. + * Role subclasses define the actions of the broker in each role. + * The Role interface allows the HaBroker to pass management actions + * to be implemented by the role. + */ +class Role +{ + public: + /** Log prefix appropriate to the role */ + virtual std::string getLogPrefix() const = 0; + + /** QMF promote method handler. + * @return The new role if promoted, 0 if not. Caller takes ownership. + */ + virtual Role* promote() = 0; + + virtual void setBrokerUrl(const Url& url) = 0; + + private: +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_ROLE_H*/ diff --git a/qpid/cpp/src/qpid/ha/StandAlone.h b/qpid/cpp/src/qpid/ha/StandAlone.h new file mode 100644 index 0000000000..4bfd1810f2 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/StandAlone.h @@ -0,0 +1,45 @@ +#ifndef QPID_HA_STANDALONE_H +#define QPID_HA_STANDALONE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +class Url; + +namespace ha { + +/** + * Stand-alone role: acts as a stand-alone broker, no clustering. + * HA module needed to setting up replication via QMF methods. + */ +class StandAlone : public Role +{ + public: + std::string getLogPrefix() const { return logPrefix; } + Role* promote() { return 0; } + void setBrokerUrl(const Url&) {} + + private: + std::string logPrefix; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_STANDALONE_H*/ |
