diff options
| author | Alan Conway <aconway@apache.org> | 2013-01-15 21:19:03 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-01-15 21:19:03 +0000 |
| commit | 32dd02334903931ef8c7e35c2bfff6efa840f69d (patch) | |
| tree | 104e91331c518a5848bb7942a872ddcc77f89114 /qpid/cpp/src | |
| parent | b1413e235387223d9824e7102f56cd2a01b973b9 (diff) | |
| download | qpid-python-32dd02334903931ef8c7e35c2bfff6efa840f69d.tar.gz | |
QPID-4516: Undo use of atomic status in HaBroker.cpp
Fixes issue introduced in r1432273, causing build failures on windows.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1433655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 63 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 5 |
2 files changed, 29 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 8f12f5cbe2..e15fb0c096 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -64,6 +64,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) settings(s), observer(new ConnectionObserver(*this, systemId)), status(STANDALONE), + logPrefix("Broker: "), membership(systemId), replicationTest(s.replicateDefault.get()) { @@ -72,7 +73,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // initialize() if (settings.cluster) { status = JOINING; - QPID_LOG(debug, logPrefix() << "Rejecting client connections."); + QPID_LOG(debug, logPrefix << "Rejecting client connections."); shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); @@ -80,16 +81,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) } namespace { -const std::string PREFIX_PRIMARY("Primary("); -const std::string PREFIX_BACKUP("Backup("); -const std::string PREFIX_END("): "); -} -std::string HaBroker::logPrefix() const { - BrokerStatus s = status.get(); - return (isPrimary(s) ? PREFIX_PRIMARY : PREFIX_BACKUP) + printable(s).str()+PREFIX_END; -} - -namespace { const std::string NONE("none"); bool isNone(const std::string& x) { return x.empty() || x == NONE; } } @@ -102,7 +93,7 @@ void HaBroker::initialize() { broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId); - QPID_LOG(notice, logPrefix() << "Initializing: " << brokerInfo); + QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -124,7 +115,7 @@ void HaBroker::initialize() { status = JOINING; backup.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); - statusCheck.reset(new StatusCheck(logPrefix(), broker.getLinkHearbeatInterval(), brokerInfo)); + statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo)); if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl)); if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl)); } @@ -137,7 +128,7 @@ void HaBroker::initialize() { } HaBroker::~HaBroker() { - QPID_LOG(notice, logPrefix() << "Shut down"); + QPID_LOG(notice, logPrefix << "Shut down"); broker.getConnectionObservers().remove(observer); } @@ -147,7 +138,7 @@ void HaBroker::recover() { BrokerInfo::Set backups; { Mutex::ScopedLock l(lock); - if (isPrimary(status.get())) { + if (isPrimary(status)) { QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo); return; } @@ -183,12 +174,12 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, recover(); else { QPID_LOG(error, - logPrefix() << "Joining active cluster, cannot be promoted."); + logPrefix << "Joining active cluster, cannot be promoted."); throw Exception("Cluster already active, cannot be promoted."); } break; case CATCHUP: - QPID_LOG(error, logPrefix() << "Still catching up, cannot be promoted."); + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); throw Exception("Still catching up, cannot be promoted."); break; case READY: recover(); break; @@ -209,7 +200,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, logPrefix() << "Replicate individual queue " + QPID_LOG(debug, logPrefix << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); @@ -244,7 +235,7 @@ void HaBroker::setPublicUrl(const Url& url) { mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, logPrefix() << "Setting public URL to: " << url); + QPID_LOG(debug, logPrefix << "Setting public URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { @@ -253,8 +244,8 @@ void HaBroker::setBrokerUrl(const Url& url) { Mutex::ScopedLock l(lock); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, logPrefix() << "Brokers URL set to: " << url); - if (status.get() == JOINING && statusCheck.get()) statusCheck->setUrl(url); + QPID_LOG(info, logPrefix << "Brokers URL set to: " << url); + if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url); b = backup; } if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock @@ -266,12 +257,12 @@ std::vector<Url> HaBroker::getKnownBrokers() const { } void HaBroker::shutdown() { - QPID_LOG(critical, logPrefix() << "Critical error, shutting down."); + QPID_LOG(critical, logPrefix << "Critical error, shutting down."); broker.shutdown(); } BrokerStatus HaBroker::getStatus() const { - return status.get(); + return status; } void HaBroker::setStatus(BrokerStatus newStatus) { @@ -300,12 +291,12 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } // namespace void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { - QPID_LOG(info, logPrefix() << "Status change: " - << printable(status.get()) << " -> " << printable(newStatus)); - bool legal = checkTransition(status.get(), newStatus); + QPID_LOG(info, logPrefix << "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); if (!legal) { - QPID_LOG(critical, logPrefix() << "Illegal state transition: " - << printable(status.get()) << " -> " << printable(newStatus)); + QPID_LOG(critical, logPrefix << "Illegal state transition: " + << printable(status) << " -> " << printable(newStatus)); shutdown(); } assert(legal); // FIXME aconway 2012-12-07: fail @@ -314,15 +305,15 @@ void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { } void HaBroker::statusChanged(Mutex::ScopedLock& l) { - mgmtObject->set_status(printable(status.get()).str()); - brokerInfo.setStatus(status.get()); + mgmtObject->set_status(printable(status).str()); + brokerInfo.setStatus(status); membership.add(brokerInfo); membershipUpdated(l); setLinkProperties(l); } void HaBroker::membershipUpdated(Mutex::ScopedLock&) { - QPID_LOG(info, logPrefix() << "Membership: " << membership); + QPID_LOG(info, logPrefix << "Membership: " << membership); Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); @@ -337,15 +328,15 @@ void HaBroker::setMembership(const Variant::List& brokers) { // Update my status to what the primary says it is. The primary sets // status to READY when we are caught up, and sets status to CATCHUP // (from READY) if we are timed out during recovery. - if (membership.get(systemId, info) && status.get() != info.getStatus()) { - assert((status.get() == CATCHUP && info.getStatus() == READY) || - (status.get() == READY && info.getStatus() == CATCHUP)); + if (membership.get(systemId, info) && status != info.getStatus()) { + assert((status == CATCHUP && info.getStatus() == READY) || + (status == READY && info.getStatus() == CATCHUP)); setStatus(info.getStatus(), l); b = backup; } membershipUpdated(l); } - if (b) b->setStatus(status.get()); // Oustside lock, avoid deadlock + if (b) b->setStatus(status); // Oustside lock, avoid deadlock } void HaBroker::addBroker(const BrokerInfo& b) { @@ -365,7 +356,7 @@ void HaBroker::removeBroker(const Uuid& id) { void HaBroker::setLinkProperties(Mutex::ScopedLock&) { framing::FieldTable linkProperties = broker.getLinkClientProperties(); - if (isBackup(status.get())) { + if (isBackup(status)) { // If this is a backup then any outgoing links are backup // links and need to be tagged. linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index e6d6e1a94f..f2a837ddea 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -32,7 +32,6 @@ #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qpid/management/Manageable.h" #include "qpid/types/Variant.h" -#include "qpid/sys/AtomicValue.h" #include <set> #include <boost/shared_ptr.hpp> @@ -108,7 +107,6 @@ class HaBroker : public management::Manageable void recover(); void statusChanged(sys::Mutex::ScopedLock&); void setLinkProperties(sys::Mutex::ScopedLock&); - std::string logPrefix() const; std::vector<Url> getKnownBrokers() const; @@ -125,7 +123,8 @@ class HaBroker : public management::Manageable qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; - sys::AtomicValue<BrokerStatus> status; + BrokerStatus status; + std::string logPrefix; BrokerInfo brokerInfo; Membership membership; ReplicationTest replicationTest; |
