summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-01-15 21:19:03 +0000
committerAlan Conway <aconway@apache.org>2013-01-15 21:19:03 +0000
commit32dd02334903931ef8c7e35c2bfff6efa840f69d (patch)
tree104e91331c518a5848bb7942a872ddcc77f89114 /qpid/cpp/src
parentb1413e235387223d9824e7102f56cd2a01b973b9 (diff)
downloadqpid-python-32dd02334903931ef8c7e35c2bfff6efa840f69d.tar.gz
QPID-4516: Undo use of atomic status in HaBroker.cpp
Fixes issue introduced in r1432273, causing build failures on windows. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1433655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp63
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h5
2 files changed, 29 insertions, 39 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 8f12f5cbe2..e15fb0c096 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -64,6 +64,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
settings(s),
observer(new ConnectionObserver(*this, systemId)),
status(STANDALONE),
+ logPrefix("Broker: "),
membership(systemId),
replicationTest(s.replicateDefault.get())
{
@@ -72,7 +73,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// initialize()
if (settings.cluster) {
status = JOINING;
- QPID_LOG(debug, logPrefix() << "Rejecting client connections.");
+ QPID_LOG(debug, logPrefix << "Rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
@@ -80,16 +81,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
}
namespace {
-const std::string PREFIX_PRIMARY("Primary(");
-const std::string PREFIX_BACKUP("Backup(");
-const std::string PREFIX_END("): ");
-}
-std::string HaBroker::logPrefix() const {
- BrokerStatus s = status.get();
- return (isPrimary(s) ? PREFIX_PRIMARY : PREFIX_BACKUP) + printable(s).str()+PREFIX_END;
-}
-
-namespace {
const std::string NONE("none");
bool isNone(const std::string& x) { return x.empty() || x == NONE; }
}
@@ -102,7 +93,7 @@ void HaBroker::initialize() {
broker.getSystem()->getNodeName(),
broker.getPort(broker::Broker::TCP_TRANSPORT),
systemId);
- QPID_LOG(notice, logPrefix() << "Initializing: " << brokerInfo);
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -124,7 +115,7 @@ void HaBroker::initialize() {
status = JOINING;
backup.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
- statusCheck.reset(new StatusCheck(logPrefix(), broker.getLinkHearbeatInterval(), brokerInfo));
+ statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
@@ -137,7 +128,7 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix() << "Shut down");
+ QPID_LOG(notice, logPrefix << "Shut down");
broker.getConnectionObservers().remove(observer);
}
@@ -147,7 +138,7 @@ void HaBroker::recover() {
BrokerInfo::Set backups;
{
Mutex::ScopedLock l(lock);
- if (isPrimary(status.get())) {
+ if (isPrimary(status)) {
QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo);
return;
}
@@ -183,12 +174,12 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
recover();
else {
QPID_LOG(error,
- logPrefix() << "Joining active cluster, cannot be promoted.");
+ logPrefix << "Joining active cluster, cannot be promoted.");
throw Exception("Cluster already active, cannot be promoted.");
}
break;
case CATCHUP:
- QPID_LOG(error, logPrefix() << "Still catching up, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
throw Exception("Still catching up, cannot be promoted.");
break;
case READY: recover(); break;
@@ -209,7 +200,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix() << "Replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -244,7 +235,7 @@ void HaBroker::setPublicUrl(const Url& url) {
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix() << "Setting public URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -253,8 +244,8 @@ void HaBroker::setBrokerUrl(const Url& url) {
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix() << "Brokers URL set to: " << url);
- if (status.get() == JOINING && statusCheck.get()) statusCheck->setUrl(url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
+ if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
b = backup;
}
if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
@@ -266,12 +257,12 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
}
void HaBroker::shutdown() {
- QPID_LOG(critical, logPrefix() << "Critical error, shutting down.");
+ QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
broker.shutdown();
}
BrokerStatus HaBroker::getStatus() const {
- return status.get();
+ return status;
}
void HaBroker::setStatus(BrokerStatus newStatus) {
@@ -300,12 +291,12 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
} // namespace
void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix() << "Status change: "
- << printable(status.get()) << " -> " << printable(newStatus));
- bool legal = checkTransition(status.get(), newStatus);
+ QPID_LOG(info, logPrefix << "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
if (!legal) {
- QPID_LOG(critical, logPrefix() << "Illegal state transition: "
- << printable(status.get()) << " -> " << printable(newStatus));
+ QPID_LOG(critical, logPrefix << "Illegal state transition: "
+ << printable(status) << " -> " << printable(newStatus));
shutdown();
}
assert(legal); // FIXME aconway 2012-12-07: fail
@@ -314,15 +305,15 @@ void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
}
void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status.get()).str());
- brokerInfo.setStatus(status.get());
+ mgmtObject->set_status(printable(status).str());
+ brokerInfo.setStatus(status);
membership.add(brokerInfo);
membershipUpdated(l);
setLinkProperties(l);
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix() << "Membership: " << membership);
+ QPID_LOG(info, logPrefix << "Membership: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -337,15 +328,15 @@ void HaBroker::setMembership(const Variant::List& brokers) {
// Update my status to what the primary says it is. The primary sets
// status to READY when we are caught up, and sets status to CATCHUP
// (from READY) if we are timed out during recovery.
- if (membership.get(systemId, info) && status.get() != info.getStatus()) {
- assert((status.get() == CATCHUP && info.getStatus() == READY) ||
- (status.get() == READY && info.getStatus() == CATCHUP));
+ if (membership.get(systemId, info) && status != info.getStatus()) {
+ assert((status == CATCHUP && info.getStatus() == READY) ||
+ (status == READY && info.getStatus() == CATCHUP));
setStatus(info.getStatus(), l);
b = backup;
}
membershipUpdated(l);
}
- if (b) b->setStatus(status.get()); // Oustside lock, avoid deadlock
+ if (b) b->setStatus(status); // Oustside lock, avoid deadlock
}
void HaBroker::addBroker(const BrokerInfo& b) {
@@ -365,7 +356,7 @@ void HaBroker::removeBroker(const Uuid& id) {
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status.get())) {
+ if (isBackup(status)) {
// If this is a backup then any outgoing links are backup
// links and need to be tagged.
linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index e6d6e1a94f..f2a837ddea 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -32,7 +32,6 @@
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
#include "qpid/types/Variant.h"
-#include "qpid/sys/AtomicValue.h"
#include <set>
#include <boost/shared_ptr.hpp>
@@ -108,7 +107,6 @@ class HaBroker : public management::Manageable
void recover();
void statusChanged(sys::Mutex::ScopedLock&);
void setLinkProperties(sys::Mutex::ScopedLock&);
- std::string logPrefix() const;
std::vector<Url> getKnownBrokers() const;
@@ -125,7 +123,8 @@ class HaBroker : public management::Manageable
qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- sys::AtomicValue<BrokerStatus> status;
+ BrokerStatus status;
+ std::string logPrefix;
BrokerInfo brokerInfo;
Membership membership;
ReplicationTest replicationTest;