summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-19 22:34:15 +0000
committerAlan Conway <aconway@apache.org>2014-08-19 22:34:15 +0000
commit269580b80a3242f7fb6e7dbfff9859e43bbad8fd (patch)
tree9f8fa5faa01ff9234cb5570e087bf1664b10c876 /qpid/cpp/src
parent451bda18227dccc91a08fe1ade559d0f91be932d (diff)
downloadqpid-python-269580b80a3242f7fb6e7dbfff9859e43bbad8fd.tar.gz
QPID-6020: HA logging improvements - log prefix with status and ID.
Include broker status and ID in (almost) all logging messages. Makes it much easier to track broker state and interactions. Sundry other logging improvements including: - Demote noisy messages to trace - connections from rgmanager status checks, searching for primary. - Rationalise start-up messages. - Improved queue state detail replicating subscription and queue guard initialization. - Fail to prepare TX is error. - Collect all primary TX errors into one. - Fix status of catchup brokers in primary membership for logging. - Add process name/PID info to client connection messages. - Various minor message tweaks. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt20
-rw-r--r--qpid/cpp/src/qpid/broker/TxBuffer.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h5
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h8
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp66
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h5
-rw-r--r--qpid/cpp/src/qpid/ha/FailoverExchange.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/IdSetter.h12
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.cpp35
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.h75
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp25
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp64
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h4
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h5
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp19
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h5
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp29
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h5
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h5
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h3
-rw-r--r--qpid/cpp/src/qpid/ha/Role.h3
-rw-r--r--qpid/cpp/src/qpid/ha/StandAlone.h4
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h6
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp4
38 files changed, 368 insertions, 203 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index ebdff57961..568cec84b8 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -522,7 +522,6 @@ option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
- qpid/ha/QueueSnapshot.h
qpid/ha/AlternateExchangeSetter.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
@@ -533,24 +532,29 @@ if (BUILD_HA)
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionObserver.cpp
qpid/ha/ConnectionObserver.h
- qpid/ha/Event.cpp
- qpid/ha/Event.h
+ qpid/ha/Event.cpp
+ qpid/ha/Event.h
qpid/ha/FailoverExchange.cpp
qpid/ha/FailoverExchange.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/IdSetter.h
- qpid/ha/QueueSnapshot.h
+ qpid/ha/IdSetter.h
+ qpid/ha/LogPrefix.cpp
+ qpid/ha/LogPrefix.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
qpid/ha/Primary.h
qpid/ha/PrimaryQueueLimits.h
+ qpid/ha/PrimaryTxObserver.cpp
+ qpid/ha/PrimaryTxObserver.h
qpid/ha/QueueGuard.cpp
qpid/ha/QueueGuard.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
+ qpid/ha/QueueSnapshot.h
+ qpid/ha/QueueSnapshot.h
qpid/ha/RemoteBackup.cpp
qpid/ha/RemoteBackup.h
qpid/ha/ReplicatingSubscription.cpp
@@ -564,11 +568,9 @@ if (BUILD_HA)
qpid/ha/StatusCheck.h
qpid/ha/TxReplicatingSubscription.cpp
qpid/ha/TxReplicatingSubscription.h
- qpid/ha/PrimaryTxObserver.cpp
- qpid/ha/PrimaryTxObserver.h
+ qpid/ha/TxReplicator.cpp
+ qpid/ha/TxReplicator.h
qpid/ha/types.cpp
- qpid/ha/TxReplicator.cpp
- qpid/ha/TxReplicator.h
qpid/ha/types.h
)
diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
index 6dc2f5c2f4..f7552f16a3 100644
--- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp
+++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp
@@ -96,7 +96,8 @@ std::string TxBuffer::endCommit(TransactionalStore* const store) {
void TxBuffer::setError(const std::string& e) {
QPID_LOG(error, "Asynchronous transaction error: " << e);
sys::Mutex::ScopedLock l(errorLock);
- error = e;
+ if (!error.empty()) error += " ";
+ error += e;
}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 9d50b1c665..f1b6eadd75 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -49,7 +49,7 @@ using std::string;
using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+ logPrefix(hb.logPrefix), membership(hb.getMembership()), stopped(false),
haBroker(hb), broker(hb.getBroker()), settings(s),
statusCheck(new StatusCheck(hb))
{}
@@ -60,7 +60,7 @@ void Backup::setBrokerUrl(const Url& brokers) {
if (stopped) return;
if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
if (!link) { // Not yet initialized
- QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Connecting to cluster: " << brokers);
string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
link = broker.getLinks().declare(
@@ -78,7 +78,6 @@ void Backup::setBrokerUrl(const Url& brokers) {
void Backup::stop(Mutex::ScopedLock&) {
if (stopped) return;
stopped = true;
- QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
if (replicator.get()) {
replicator->shutdown();
@@ -106,8 +105,7 @@ Role* Backup::promote() {
case JOINING:
if (statusCheck->canPromote()) return recover(l);
else {
- QPID_LOG(error,
- logPrefix << "Joining active cluster, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Joining active cluster, cannot be promoted.");
throw Exception("Joining active cluster, cannot be promoted.");
}
break;
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 88194158ce..47c44aa59c 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "Role.h"
#include "Settings.h"
#include "qpid/Url.h"
@@ -53,8 +54,6 @@ class Backup : public Role
Backup(HaBroker&, const Settings&);
~Backup();
- std::string getLogPrefix() const { return logPrefix; }
-
void setBrokerUrl(const Url&);
Role* promote();
@@ -65,7 +64,7 @@ class Backup : public Role
void stop(sys::Mutex::ScopedLock&);
Role* recover(sys::Mutex::ScopedLock&);
- std::string logPrefix;
+ const LogPrefix& logPrefix;
Membership& membership;
sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index 5a67cde922..a58e666fa7 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -35,12 +36,17 @@ namespace ha {
class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
+ BackupConnectionExcluder(const LogPrefix& lp) : logPrefix(lp) {}
+
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Rejected connection "+connection.getMgmtId());
connection.abort();
}
void closed(broker::Connection&) {}
+
+ private:
+ const LogPrefix& logPrefix;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index a13451e179..c8a652a7ab 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::Map& m) {
}
std::ostream& BrokerInfo::printId(std::ostream& o) const {
- o << getSystemId().str().substr(0,8);
+ o << shortStr(getSystemId());
if (getAddress() != empty) o << "@" << getAddress();
return o;
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index 87d51f5113..92556a5c4b 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -63,9 +63,14 @@ class BrokerInfo
void assign(const types::Variant::Map&);
// So it can be put in a set.
- bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
+ bool operator<(const BrokerInfo& x) const { return systemId < x.systemId; }
- // Print just the identifying information, not the status.
+ bool operator==(const BrokerInfo& x) const
+ { return address == x.address && systemId == x.systemId && status == x.status; }
+
+ bool operator!=(const BrokerInfo& x) const { return !(*this == x); }
+
+ // Print just the identifying information (shortId@address), not the status.
std::ostream& printId(std::ostream& o) const;
private:
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 3957ef5a0c..a62080932d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -175,7 +175,7 @@ Variant::Map asMapVoid(const Variant& value) {
// Report errors on the broker replication session.
class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+ ErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
@@ -189,12 +189,10 @@ class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorList
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached.");
- }
+ void detach() {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -213,8 +211,9 @@ class BrokerReplicator::UpdateTracker {
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
- CleanFn f)
- : type(type_), cleanFn(f) {}
+ CleanFn f,
+ const LogPrefix& lp)
+ : type(type_), cleanFn(f), logPrefix(lp) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
@@ -224,7 +223,7 @@ class BrokerReplicator::UpdateTracker {
boost::bind(&UpdateTracker::clean, this, _1));
}
catch (const std::exception& e) {
- QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+ QPID_LOG(error, logPrefix << "Error in cleanup of lost objects: " << e.what());
}
}
@@ -251,7 +250,7 @@ class BrokerReplicator::UpdateTracker {
private:
void clean(const std::string& name) {
- QPID_LOG(debug, "Backup: Deleted " << type << " " << name <<
+ QPID_LOG(debug, logPrefix << "Deleted " << type << " " << name <<
": no longer exists on primary");
try { cleanFn(name); }
catch (const framing::NotFoundException&) {}
@@ -260,6 +259,7 @@ class BrokerReplicator::UpdateTracker {
std::string type;
Names initial, events;
CleanFn cleanFn;
+ const LogPrefix& logPrefix;
};
namespace {
@@ -279,7 +279,7 @@ boost::shared_ptr<BrokerReplicator> BrokerReplicator::create(
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(NONE),
+ logPrefix(hb.logPrefix), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
@@ -372,19 +372,20 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
- QPID_LOG(notice, logPrefix << (initialized ? "Failing over" : "Connecting")
- << " to primary " << primary
- << " status:" << printable(haBroker.getStatus()));
+ QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
+ << " to primary " << primary);
initialized = true;
exchangeTracker.reset(
new UpdateTracker("exchange",
- boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ logPrefix));
exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
queueTracker.reset(
new UpdateTracker("queue",
- boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ logPrefix));
queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
@@ -417,14 +418,14 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
// Called for each queue in existence when the backup connects to a primary.
void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
if (replicationTest.getLevel(*q)) {
- QPID_LOG(debug, "Existing queue: " << q->getName());
+ QPID_LOG(debug, logPrefix << "Existing queue: " << q->getName());
queueTracker->addQueue(q);
}
}
void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
if (replicationTest.getLevel(*ex)) {
- QPID_LOG(debug, "Existing exchange: " << ex->getName());
+ QPID_LOG(debug, logPrefix << "Existing exchange: " << ex->getName());
exchangeTracker->addExchange(ex);
}
}
@@ -447,7 +448,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator event: " << map);
+ QPID_LOG(trace, logPrefix << "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
std::string key = (schema[PACKAGE_NAME].asString() +
@@ -459,7 +460,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "Broker replicator response: " << map);
+ QPID_LOG(trace, logPrefix << "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
@@ -691,8 +692,7 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
if (exchange &&
exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
{
- QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
- << name);
+ QPID_LOG(warning, logPrefix << "Exchange response replacing (UUID mismatch): " << name);
deleteExchange(name);
}
CreateExchangeResult result = createExchange(
@@ -793,21 +793,17 @@ void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
}
void BrokerReplicator::deleteExchange(const std::string& name) {
- try {
- boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
- if (!exchange) {
- QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
- return;
- }
- if (exchange->inUseAsAlternate()) {
- QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
- return;
- }
- broker.deleteExchange(name, userId, remoteHost);
- QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
- } catch (const framing::NotFoundException&) {
- QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+ boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+ if (!exchange) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+ return;
+ }
+ if (exchange->inUseAsAlternate()) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, in use as alternate: " << name);
+ return;
}
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
}
boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 1e051878ae..44e80263de 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -52,6 +52,7 @@ class FieldTable;
}
namespace ha {
+class LogPrefix;
class HaBroker;
class QueueReplicator;
@@ -155,7 +156,7 @@ class BrokerReplicator : public broker::Exchange,
void setMembership(const types::Variant::List&); // Set membership from list.
- std::string logPrefix;
+ const LogPrefix& logPrefix;
ReplicationTest replicationTest;
std::string userId, remoteHost;
HaBroker& haBroker;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index c9c5c2e576..a824adb871 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -31,7 +31,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
+ : haBroker(hb), logPrefix(hb.logPrefix), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
@@ -55,11 +55,10 @@ bool ConnectionObserver::getAddress(const broker::Connection& connection, Addres
return false;
}
-void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+void ConnectionObserver::setObserver(const ObserverPtr& o)
{
sys::Mutex::ScopedLock l(lock);
observer = o;
- logPrefix = newlogPrefix;
}
ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
@@ -83,21 +82,21 @@ void ConnectionObserver::opened(broker::Connection& connection) {
// Set my own address if there is an address header.
Address addr;
if (getAddress(connection, addr)) haBroker.setAddress(addr);
- QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Rejected self connection "+connection.getMgmtId());
connection.abort();
return;
}
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
- QPID_LOG(debug, logPrefix << "Accepted admin connection: "
- << connection.getMgmtId());
+ QPID_LOG(trace, logPrefix << "Accepted admin connection: " << connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Open error: " << e.what());
+ QPID_LOG(error, logPrefix << "Error on incoming connection " << connection.getMgmtId()
+ << ": " << e.what());
throw;
}
}
@@ -109,7 +108,8 @@ void ConnectionObserver::closed(broker::Connection& connection) {
if (o) o->closed(connection);
}
catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Close error: " << e.what());
+ QPID_LOG(error, logPrefix << "Error closing incoming connection " << connection.getMgmtId()
+ << ": " << e.what());
throw;
}
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index 079dc43be6..f447d479f0 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -34,6 +34,7 @@ struct Address;
namespace ha {
class BrokerInfo;
class HaBroker;
+class LogPrefix;
/**
* Observes connections, delegates to another ConnectionObserver for
@@ -59,7 +60,7 @@ class ConnectionObserver : public broker::ConnectionObserver
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
- void setObserver(const ObserverPtr&, const std::string& logPrefix);
+ void setObserver(const ObserverPtr&);
ObserverPtr getObserver();
void reset();
@@ -72,7 +73,7 @@ class ConnectionObserver : public broker::ConnectionObserver
sys::Mutex lock;
HaBroker& haBroker;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
ObserverPtr observer;
types::Uuid self;
};
diff --git a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
index f1b87c63c8..9bda5ea5bf 100644
--- a/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
+++ b/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
@@ -63,7 +63,6 @@ ostream& operator<<(ostream& o, const OstreamUrls& urls) {
FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b)
: Exchange(typeName, &parent, &b)
{
- QPID_LOG(debug, typeName << " created.");
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -114,7 +113,7 @@ bool FailoverExchange::hasBindings() {
}
void FailoverExchange::route(Deliverable&) {
- QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
+ QPID_LOG(warning, typeName << " unexpected message, ignored.");
}
void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index f154e45a22..7699b0e1d2 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -69,10 +69,16 @@ using boost::dynamic_pointer_cast;
//
class HaBroker::BrokerObserver : public broker::BrokerObserver {
public:
+ BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {}
+
void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
- q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
+ q->getMessageInterceptors().add(
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName())));
}
+
+ private:
+ const LogPrefix& logPrefix;
};
// Called in Plugin::earlyInitialize
@@ -83,20 +89,19 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
broker(b),
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
- membership(BrokerInfo(systemId, STANDALONE), *this),
+ membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix
failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, "Backup starting, rejecting client connections.");
- shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
- observer->setObserver(excluder, "Backup: ");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix));
+ observer->setObserver(excluder);
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix)));
}
namespace {
@@ -107,8 +112,8 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
if (settings.cluster) {
+ QPID_LOG(info, logPrefix << "Starting HA broker");
membership.setStatus(JOINING);
- QPID_LOG(info, "Initializing HA broker: " << membership.getSelf());
}
// Set up the management object.
@@ -138,7 +143,6 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
@@ -160,7 +164,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -195,7 +199,7 @@ void HaBroker::setPublicUrl(const Url& url) {
knownBrokers.push_back(url);
vector<Url> urls(1, url);
failoverExchange->updateUrls(urls);
- QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
+ QPID_LOG(debug, logPrefix << "Public URL set to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -203,7 +207,7 @@ void HaBroker::setBrokerUrl(const Url& url) {
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
}
role->setBrokerUrl(url); // Oustside lock
}
@@ -214,7 +218,7 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
}
void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, "Shutting down: " << message);
+ QPID_LOG(critical, logPrefix << "Shutting down: " << message);
broker.shutdown();
throw Exception(message);
}
@@ -224,7 +228,7 @@ BrokerStatus HaBroker::getStatus() const {
}
void HaBroker::setAddress(const Address& a) {
- QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+ QPID_LOG(info, logPrefix << "Set self address to: " << a);
membership.setSelfAddress(a);
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 9fadd4f35c..023706e7e3 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -25,6 +25,7 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
+#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "FailoverExchange.h"
@@ -101,6 +102,9 @@ class HaBroker : public management::Manageable
/** Authenticated user ID for queue create/delete */
std::string getUserId() const { return userId; }
+ /** logPrefix is thread safe and used by other classes (Membership) */
+ LogPrefix logPrefix;
+
private:
class BrokerObserver;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index a958b0d29c..913a9b5084 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -76,7 +76,7 @@ struct HaPlugin : public Plugin {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && (settings.cluster || settings.queueReplication)) {
if (!broker->getManagementAgent()) {
- QPID_LOG(warning, "HA plugin disabled because management is disabled");
+ QPID_LOG(warning, "Cannot start HA: management is disabled");
if (settings.cluster)
throw Exception("Cannot start HA: management is disabled");
} else {
diff --git a/qpid/cpp/src/qpid/ha/IdSetter.h b/qpid/cpp/src/qpid/ha/IdSetter.h
index 0350bf1519..f0629c99bb 100644
--- a/qpid/cpp/src/qpid/ha/IdSetter.h
+++ b/qpid/cpp/src/qpid/ha/IdSetter.h
@@ -33,6 +33,7 @@
namespace qpid {
namespace ha {
+class LogPrefix;
/**
* A MessageInterceptor that sets the ReplicationId on each message as it is
@@ -43,16 +44,16 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) {
- QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId);
- }
+ IdSetter(const LogPrefix& lp, const std::string& q, ReplicationId firstId=1) :
+ logPrefix(lp), queue(q), nextId(firstId)
+ {}
void record(broker::Message& m) {
// Record is called when a message is first delivered to a queue, before it has
// been enqueued or saved in a transaction buffer. This is when we normally want
// to assign a replication-id.
m.setReplicationId(nextId++);
- QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
+ QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
}
void publish(broker::Message& m) {
@@ -62,11 +63,12 @@ class IdSetter : public broker::MessageInterceptor
// store record() is not called, so set the ID now if not already set.
if (!m.hasReplicationId()) {
m.setReplicationId(nextId++);
- QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m));
+ QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m));
}
}
private:
+ const LogPrefix& logPrefix;
std::string queue;
sys::AtomicValue<uint32_t> nextId;
};
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
new file mode 100644
index 0000000000..c1ccf050c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LogPrefix.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp) {
+ return o << lp.get();
+}
+
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp) {
+ return o << lp.prePrefix.get() << lp.get();
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h
new file mode 100644
index 0000000000..3b6bb17d99
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.h
@@ -0,0 +1,75 @@
+#ifndef QPID_HA_LOGPREFIX_H
+#define QPID_HA_LOGPREFIX_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/sys/Mutex.h>
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Thread safe string holder to hold a string that may be read and modified concurrently.
+ */
+class LogPrefix
+{
+ public:
+ explicit LogPrefix(const std::string& s=std::string()) : prefix(s) {}
+ void set(const std::string& s) { sys::RWlock::ScopedWlock l(lock); prefix = s; }
+ std::string get() const { sys::RWlock::ScopedRlock l(lock); return prefix; }
+
+ LogPrefix& operator=(const std::string& s) { set(s); return *this; }
+ operator std::string() const { return get(); }
+
+ private:
+ // Undefined, not copyable.
+ LogPrefix(const LogPrefix& lp);
+ LogPrefix& operator=(const LogPrefix&);
+
+ mutable sys::RWlock lock;
+ std::string prefix;
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp);
+
+/**
+ * A two-part log prefix with a reference to a pre-prefix and a post-prefix.
+ * Operator << will print both parts, get/set just manage the post-prefix.
+ */
+class LogPrefix2 : public LogPrefix {
+ public:
+ const LogPrefix& prePrefix;
+ explicit LogPrefix2(const LogPrefix& lp, const std::string& s=std::string()) : LogPrefix(s), prePrefix(lp) {}
+ LogPrefix2& operator=(const std::string& s) { set(s); return *this; }
+
+ private:
+ // Undefined, not copyable.
+ LogPrefix2(const LogPrefix2& lp);
+ LogPrefix2& operator=(const LogPrefix2&);
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp);
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_LOGPREFIX_H*/
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index fdb47014d9..92a0b7db70 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -43,9 +43,14 @@ Membership::Membership(const BrokerInfo& info, HaBroker& b)
: haBroker(b), self(info.getSystemId())
{
brokers[self] = info;
+ setPrefix();
oldStatus = info.getStatus();
}
+void Membership::setPrefix() {
+ haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId())
+ << "(" << printable(brokers[self].getStatus()) << ") ";
+}
void Membership::clear() {
Mutex::ScopedLock l(lock);
BrokerInfo me = brokers[self];
@@ -57,7 +62,7 @@ void Membership::add(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
assert(b.getSystemId() != self);
brokers[b.getSystemId()] = b;
- update(l);
+ update(true, l);
}
@@ -67,7 +72,7 @@ void Membership::remove(const types::Uuid& id) {
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- update(l);
+ update(true, l);
}
}
@@ -83,7 +88,7 @@ void Membership::assign(const types::Variant::List& list) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
- update(l);
+ update(true, l);
}
types::Variant::List Membership::asList() const {
@@ -144,8 +149,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
}
} // namespace
-void Membership::update(Mutex::ScopedLock& l) {
- QPID_LOG(info, "Membership: " << brokers);
+void Membership::update(bool log, Mutex::ScopedLock& l) {
// Update managment and send update event.
BrokerStatus newStatus = getStatus(l);
Variant::List brokerList = asList(l);
@@ -171,27 +175,30 @@ void Membership::update(Mutex::ScopedLock& l) {
// Check status transitions
if (oldStatus != newStatus) {
- QPID_LOG(info, "Status change: "
+ QPID_LOG(info, haBroker.logPrefix << "Status change: "
<< printable(oldStatus) << " -> " << printable(newStatus));
if (!checkTransition(oldStatus, newStatus)) {
haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
<< " -> " << printable(newStatus)));
}
oldStatus = newStatus;
+ setPrefix();
+ if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready");
}
+ if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " << brokers);
}
void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
Mutex::ScopedLock l(lock);
mgmtObject = mo;
- update(l);
+ update(false, l);
}
void Membership::setStatus(BrokerStatus newStatus) {
Mutex::ScopedLock l(lock);
brokers[self].setStatus(newStatus);
- update(l);
+ update(false, l);
}
BrokerStatus Membership::getStatus() const {
@@ -215,7 +222,7 @@ BrokerInfo Membership::getSelf() const {
void Membership::setSelfAddress(const Address& a) {
Mutex::ScopedLock l(lock);
brokers[self].setAddress(a);
- update(l);
+ update(false, l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 5b2b72e2fc..5590d255fa 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -85,7 +85,8 @@ class Membership
void setSelfAddress(const Address&);
private:
- void update(sys::Mutex::ScopedLock&);
+ void setPrefix();
+ void update(bool log, sys::Mutex::ScopedLock&);
BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
types::Variant::List asList(sys::Mutex::ScopedLock&) const;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index dd41f74790..3790d14626 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -38,6 +38,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
@@ -54,6 +55,10 @@ using namespace framing;
namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
@@ -90,7 +95,7 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
public:
- PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+ PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
@@ -104,17 +109,15 @@ class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached.");
- }
+ void detach() {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
public:
- PrimarySessionHandlerObserver(const std::string& logPrefix)
+ PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
: errorListener(new PrimaryErrorListener(logPrefix)) {}
void newSessionHandler(broker::SessionHandler& sh) {
BrokerInfo info;
@@ -133,7 +136,7 @@ class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
- logPrefix("Primary: "), active(false),
+ logPrefix(hb.logPrefix), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
@@ -142,6 +145,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// So we are safe from client interference while we set up the primary.
hb.getMembership().setStatus(RECOVERING);
+ QPID_LOG(notice, logPrefix << "Promoted to primary");
// Process all QueueReplicators, handles auto-delete queues.
QueueReplicator::Vector qrs;
@@ -152,10 +156,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// NOTE: RemoteBackups must be created before we set the BrokerObserver
// or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
- QPID_LOG(notice, logPrefix << "Promoted and recovering, waiting for backups: "
- << expect);
+ QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, logPrefix));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
setCatchupQueues(backup, true); // Create guards
@@ -173,7 +176,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
+ haBroker.getObserver()->setObserver(connectionObserver);
}
Primary::~Primary() {
@@ -191,8 +194,8 @@ void Primary::checkReady() {
activate = active = true;
}
if (activate) {
- QPID_LOG(notice, logPrefix << "Promoted and active.");
membership.setStatus(ACTIVE); // Outside of lock.
+ QPID_LOG(notice, logPrefix << "All backups recovered.");
}
}
@@ -205,7 +208,7 @@ void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
info.setStatus(READY);
membership.add(info);
if (expectedBackups.erase(backup)) {
- QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+ QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
ready = true;
}
else
@@ -229,7 +232,7 @@ void Primary::timeoutExpectedBackups() {
boost::shared_ptr<RemoteBackup> backup = *j;
if (!backup->getConnection()) {
BrokerInfo info = backup->getBrokerInfo();
- QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
+ QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
backupDisconnect(backup, l); // Calls erase(j)
// Keep broker in membership but downgrade status to CATCHUP.
// The broker will get this status change when it eventually connects.
@@ -303,6 +306,8 @@ void Primary::queueCreate(const QueuePtr& q) {
ReplicateLevel level = replicationTest.useLevel(*q);
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -312,8 +317,6 @@ void Primary::queueCreate(const QueuePtr& q) {
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueCreate(q);
}
- QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
- << " replication: " << printable(level));
checkReady(); // Outside lock
}
}
@@ -358,7 +361,7 @@ void Primary::exchangeDestroy(const ExchangePtr& ex) {
shared_ptr<RemoteBackup> Primary::backupConnect(
const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
{
- shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+ shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, logPrefix));
queueLimits.addBackup(backup);
backups[info.getSystemId()] = backup;
return backup;
@@ -382,7 +385,15 @@ void Primary::opened(broker::Connection& connection) {
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
if (i == backups.end()) {
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
QPID_LOG(info, logPrefix << "New backup connection: " << info);
backup = backupConnect(info, connection, l);
}
@@ -397,13 +408,20 @@ void Primary::opened(broker::Connection& connection) {
i->second->setConnection(&connection);
backup = i->second;
}
- if (info.getStatus() == JOINING) {
- info.setStatus(CATCHUP);
- membership.add(info);
+ }
+ else {
+ const types::Variant::Map& properties = connection.getClientProperties();
+ std::ostringstream pinfo;
+ types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
+ // FIXME aconway 2014-08-13: Conditional on logging.
+ if (i != properties.end()) {
+ pinfo << " " << i->second;
+ i = properties.find(CLIENT_PID);
+ if (i != properties.end())
+ pinfo << "(" << i->second << ")";
}
+ QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
}
- else
- QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
// Outside lock
if (backup) {
@@ -448,7 +466,7 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
}
Role* Primary::promote() {
- QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
return 0;
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 46cf990834..84d714fc01 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -25,6 +25,7 @@
#include "types.h"
#include "hash.h"
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "PrimaryQueueLimits.h"
#include "ReplicationTest.h"
#include "Role.h"
@@ -81,7 +82,6 @@ class Primary : public Role
~Primary();
// Role implementation
- std::string getLogPrefix() const { return logPrefix; }
Role* promote();
void setBrokerUrl(const Url&) {}
@@ -142,7 +142,7 @@ class Primary : public Role
mutable sys::Mutex lock;
HaBroker& haBroker;
Membership& membership;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
bool active;
ReplicationTest replicationTest;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
index d614a48099..6d0c55736a 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
@@ -36,6 +36,7 @@ class Queue;
}
namespace ha {
+class LogPrefix;
class RemoteBackup;
/**
@@ -48,7 +49,7 @@ class PrimaryQueueLimits
{
public:
// FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
- PrimaryQueueLimits(const std::string& lp,
+ PrimaryQueueLimits(const LogPrefix& lp,
broker::QueueRegistry& qr,
const ReplicationTest& rt
) :
@@ -97,7 +98,7 @@ class PrimaryQueueLimits
void removeBackup(const boost::shared_ptr<RemoteBackup>&) {}
private:
- std::string logPrefix;
+ const LogPrefix& logPrefix;
uint64_t maxQueues;
uint64_t queues;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index 90046a8c5a..bbfcbba3a3 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -94,6 +94,7 @@ PrimaryTxObserver::PrimaryTxObserver(
Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
) :
state(SENDING),
+ logPrefix(hb.logPrefix),
primary(p), haBroker(hb), broker(hb.getBroker()),
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
@@ -101,7 +102,7 @@ PrimaryTxObserver::PrimaryTxObserver(
exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
empty(true)
{
- logPrefix = "Primary transaction "+shortStr(id)+": ";
+ logPrefix = "Primary TX "+shortStr(id)+": ";
// The brokers known at this point are the ones that will be included
// in the transaction. Brokers that join later are not included.
@@ -115,8 +116,7 @@ PrimaryTxObserver::PrimaryTxObserver(
for (size_t i = 0; i < incomplete.size(); ++i)
txBuffer->startCompleter();
- QPID_LOG(debug, logPrefix << "Started TX " << id);
- QPID_LOG(debug, logPrefix << "Backups: " << backups);
+ QPID_LOG(debug, logPrefix << "Started, backups " << backups);
}
void PrimaryTxObserver::initialize() {
@@ -140,9 +140,7 @@ void PrimaryTxObserver::initialize() {
}
-PrimaryTxObserver::~PrimaryTxObserver() {
- QPID_LOG(debug, logPrefix << "Ended");
-}
+PrimaryTxObserver::~PrimaryTxObserver() {}
void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
if (state != expect)
@@ -254,7 +252,7 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) {
try {
broker.getExchanges().destroy(getExchangeName());
} catch (const std::exception& e) {
- QPID_LOG(error, logPrefix << "Deleting transaction exchange: " << e.what());
+ QPID_LOG(error, logPrefix << "Deleting TX exchange: " << e.what());
}
}
@@ -266,11 +264,12 @@ bool PrimaryTxObserver::completed(const Uuid& id, Mutex::ScopedLock&) {
return false;
}
-bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l)
+bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l)
{
if (incomplete.find(id) != incomplete.end()) {
// Note: setError before completed since completed may trigger completion.
- txBuffer->setError(QPID_MSG(logPrefix << msg << id));
+ // Only use the TX part of the log prefix.
+ txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << ".");
completed(id, l);
return true;
}
@@ -290,7 +289,7 @@ void PrimaryTxObserver::txPrepareOkEvent(const string& data) {
void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
Mutex::ScopedLock l(lock);
types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
- if (error(backup, "Prepare failed on backup: ", l)) {
+ if (error(backup, "Prepare failed on backup ", l)) {
QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
} else {
QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 6ea1ba185b..6f445ee212 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "ReplicationTest.h"
+#include "LogPrefix.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/log/Statement.h"
@@ -105,11 +106,11 @@ class PrimaryTxObserver : public broker::TransactionObserver,
void txPrepareOkEvent(const std::string& data);
void txPrepareFailEvent(const std::string& data);
bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
- bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l);
+ bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l);
sys::Monitor lock;
State state;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
Primary& primary;
HaBroker& haBroker;
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 94b7a53937..b6b6037b6f 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -47,8 +47,8 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
-QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : cancelled(false), queue(q)
+QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info, const LogPrefix& lp)
+ : cancelled(false), logPrefix(lp), queue(q)
{
std::ostringstream os;
os << "Guard of " << queue.getName() << " at ";
@@ -61,7 +61,9 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
QueuePosition front, back;
q.getRange(front, back, broker::REPLICATOR);
first = back + 1;
- QPID_LOG(debug, logPrefix << "First guarded position " << first);
+ QPID_LOG(debug, logPrefix << "Guarded: front " << front
+ << ", back " << back
+ << ", guarded " << first);
}
QueueGuard::~QueueGuard() { cancel(); }
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 33967970eb..9bf61f31da 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "hash.h"
+#include "LogPrefix.h"
#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/unordered_map.h"
@@ -59,7 +60,7 @@ class ReplicatingSubscription;
*/
class QueueGuard {
public:
- QueueGuard(broker::Queue& q, const BrokerInfo&);
+ QueueGuard(broker::Queue& q, const BrokerInfo&, const LogPrefix&);
~QueueGuard();
/** QueueObserver override. Delay completion of the message.
@@ -97,7 +98,7 @@ class QueueGuard {
sys::Mutex lock;
QueuePosition first;
bool cancelled;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
broker::Queue& queue;
Delayed delayed;
boost::shared_ptr<QueueObserver> observer;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 4abbb3affb..7997bc6aa9 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -22,6 +22,7 @@
#include "Event.h"
#include "HaBroker.h"
#include "IdSetter.h"
+#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
@@ -97,12 +98,11 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
QPID_LOG(error, logPrefix << "Incoming "
<< framing::createSessionException(code, msg).what());
}
- void detach() {
- QPID_LOG(debug, logPrefix << "Session detached");
- }
+ void detach() {}
+
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
- std::string logPrefix;
+ const LogPrefix& logPrefix;
};
class QueueReplicator::QueueObserver : public broker::QueueObserver {
@@ -152,11 +152,12 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
link(l),
queue(q),
sessionHandler(0),
- logPrefix("Backup of "+q->getName()+": "),
+ logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "),
subscribed(false),
settings(hb.getSettings()),
nextId(0), maxId(0)
{
+ QPID_LOG(debug, logPrefix << "Created");
// The QueueReplicator will take over setting replication IDs.
boost::shared_ptr<IdSetter> setter =
q->getMessageInterceptors().findType<IdSetter>();
@@ -181,7 +182,6 @@ QueueReplicator::~QueueReplicator() {}
void QueueReplicator::initialize() {
Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Created");
if (!queue) return; // Already destroyed
// Enable callback to route()
@@ -255,10 +255,13 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
- if (qs) arguments.set(ReplicatingSubscription::QPID_ID_SET,
- FieldTable::ValuePtr(
- new Var32Value(encodeStr(qs->getSnapshot()), TYPE_CODE_VBIN32)));
-
+ ReplicationIdSet snapshot;
+ if (qs) {
+ snapshot = qs->getSnapshot();
+ arguments.set(
+ ReplicatingSubscription::QPID_ID_SET,
+ FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32)));
+ }
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -268,12 +271,12 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
}
catch(const exception& e) {
- QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
+ QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what());
throw;
}
qpid::Address primary;
link->getRemoteAddress(primary);
- QPID_LOG(debug, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
+ QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName);
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
@@ -391,7 +394,7 @@ void QueueReplicator::promoted() {
// On primary QueueReplicator no longer sets IDs, start an IdSetter.
QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
queue->getMessageInterceptors().add(
- boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1)));
+ boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1)));
// Process auto-deletes
if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 3d525db440..a4b31b6c9a 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -22,7 +22,10 @@
*
*/
+
+
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "hash.h"
#include "qpid/broker/Exchange.h"
#include <boost/enable_shared_from_this.hpp>
@@ -134,7 +137,7 @@ class QueueReplicator : public broker::Exchange,
bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
- std::string logPrefix;
+ LogPrefix2 logPrefix;
std::string bridgeName;
bool subscribed;
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index c0a118d57f..c263d37e43 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -35,13 +35,13 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Connection* c
-) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false)
+ const BrokerInfo& info, broker::Connection* c, const LogPrefix& lp
+) : logPrefix(lp), brokerInfo(info), replicationTest(NONE),
+ started(false), connection(c), reportedReady(false)
{
std::ostringstream oss;
oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
- QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
}
RemoteBackup::~RemoteBackup() {
@@ -70,7 +70,7 @@ void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
QPID_LOG(debug, logPrefix << "Catch-up queue"
<< (createGuard ? " and guard" : "") << ": " << q->getName());
catchupQueues.insert(q);
- if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
+ if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
}
}
@@ -86,18 +86,12 @@ RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
void RemoteBackup::ready(const QueuePtr& q) {
catchupQueues.erase(q);
- if (catchupQueues.size()) {
- QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", "
- << catchupQueues.size() << " remain to catch up");
- }
- else
- QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
}
// Called via BrokerObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (replicationTest.getLevel(*q) == ALL)
- guards[q].reset(new QueueGuard(*q, brokerInfo));
+ guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
}
// Called via BrokerObserver
@@ -112,6 +106,7 @@ void RemoteBackup::queueDestroy(const QueuePtr& q) {
bool RemoteBackup::reportReady() {
if (!reportedReady && isReady()) {
+ if (catchupQueues.empty()) QPID_LOG(debug, logPrefix << "Caught up.");
reportedReady = true;
return true;
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index b9e2e1a496..77c493d27e 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "ReplicationTest.h"
#include "BrokerInfo.h"
#include "types.h"
@@ -56,7 +57,7 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo&, broker::Connection*);
+ RemoteBackup(const BrokerInfo&, broker::Connection*, const LogPrefix&);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
@@ -102,7 +103,7 @@ class RemoteBackup
typedef std::set<QueuePtr> QueueSet;
- std::string logPrefix;
+ LogPrefix2 logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 67e1e77681..e08a34529e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
-
+ logPrefix(hb.logPrefix),
position(0), wasStopped(false), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
@@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize() {
FieldTable ft;
if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
throw InvalidArgumentException(
- logPrefix+"Can't subscribe, no broker info: "+getTag());
+ logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
@@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize() {
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
- if (!guard) guard.reset(new QueueGuard(*queue, info));
+ if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
// NOTE: Once the observer is attached we can have concurrent
// calls to dequeued so we need to lock use of this->dequeues.
@@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize() {
if (!snapshot) {
queue->getObservers().remove(
boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
- throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+ throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
}
ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
@@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize() {
// position >= front so if front is safe then position must be.
position = front;
- QPID_LOG(debug, logPrefix << "Subscribed: front " << front
- << ", back " << back
+ QPID_LOG(debug, logPrefix << "Subscribed: primary ["
+ << front << "," << back << "]=" << primaryIds
<< ", guarded " << guard->getFirst()
- << ", on backup " << skipEnqueue);
+ << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
checkReady(l);
}
@@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
- QPID_LOG(debug, logPrefix << "Caught up");
+ if (position+1 >= guard->getFirst()) {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position);
+ } else {
+ QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
+ }
+
if (primary) primary->readyReplica(*this);
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 08c08b0ca3..d6d41dd2cf 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "LogPrefix.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/QueueObserver.h"
@@ -144,7 +145,7 @@ class ReplicatingSubscription :
bool doDispatch();
private:
- std::string logPrefix;
+ LogPrefix2 logPrefix;
QueuePosition position;
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.
diff --git a/qpid/cpp/src/qpid/ha/Role.h b/qpid/cpp/src/qpid/ha/Role.h
index 9986fde7e1..5392ce1fff 100644
--- a/qpid/cpp/src/qpid/ha/Role.h
+++ b/qpid/cpp/src/qpid/ha/Role.h
@@ -40,9 +40,6 @@ class Role
public:
virtual ~Role() {}
- /** Log prefix appropriate to the role */
- virtual std::string getLogPrefix() const = 0;
-
/** QMF promote method handler.
* @return The new role if promoted, 0 if not. Caller takes ownership.
*/
diff --git a/qpid/cpp/src/qpid/ha/StandAlone.h b/qpid/cpp/src/qpid/ha/StandAlone.h
index d052996d40..01bcf1a0b3 100644
--- a/qpid/cpp/src/qpid/ha/StandAlone.h
+++ b/qpid/cpp/src/qpid/ha/StandAlone.h
@@ -33,12 +33,8 @@ namespace ha {
class StandAlone : public Role
{
public:
- std::string getLogPrefix() const { return logPrefix; }
Role* promote() { return 0; }
void setBrokerUrl(const Url&) {}
-
- private:
- std::string logPrefix;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index ee8bd342b2..33adc9780d 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -39,10 +39,11 @@
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/framing/BufferTypes.h"
#include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/MessageTransferBody.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -57,15 +58,19 @@ namespace {
const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
} // namespace
-
-
bool TxReplicator::isTxQueue(const string& q) {
return startsWith(q, PREFIX);
}
-string TxReplicator::getTxId(const string& q) {
- assert(isTxQueue(q));
- return q.substr(PREFIX.size());
+Uuid TxReplicator::getTxId(const string& q) {
+ if (TxReplicator::isTxQueue(q)) {
+ std::istringstream is(q);
+ is.seekg(PREFIX.size());
+ Uuid id;
+ is >> id;
+ if (!is.fail()) return id;
+ }
+ throw Exception(QPID_MSG("Invalid tx queue: " << q));
}
string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
@@ -85,15 +90,14 @@ TxReplicator::TxReplicator(
const boost::shared_ptr<broker::Queue>& txQueue,
const boost::shared_ptr<broker::Link>& link) :
QueueReplicator(hb, txQueue, link),
+ logPrefix(hb.logPrefix),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
- string id(getTxId(txQueue->getName()));
- string shortId = id.substr(0, 8);
- logPrefix = "Backup of transaction "+shortId+": ";
- QPID_LOG(debug, logPrefix << "Started TX " << id);
+ logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": ";
+ QPID_LOG(debug, logPrefix << "Started");
if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
// Dispatch transaction events.
@@ -213,7 +217,7 @@ void TxReplicator::prepare(const string&, sys::Mutex::ScopedLock& l) {
QPID_LOG(debug, logPrefix << "Local prepare OK");
sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
} else {
- QPID_LOG(debug, logPrefix << "Local prepare failed");
+ QPID_LOG(error, logPrefix << "Local prepare failed");
sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
}
}
@@ -240,7 +244,7 @@ void TxReplicator::backups(const string& data, sys::Mutex::ScopedLock& l) {
TxBackupsEvent e;
decodeStr(data, e);
if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
- QPID_LOG(info, logPrefix << "Not participating in transaction");
+ QPID_LOG(info, logPrefix << "Not participating");
end(l);
} else {
QPID_LOG(debug, logPrefix << "Backups: " << e.backups);
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index fe25fbc78b..c7599d21b1 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -22,11 +22,13 @@
*
*/
+#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "Event.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/types/Uuid.h"
namespace qpid {
@@ -56,7 +58,7 @@ class TxReplicator : public QueueReplicator {
typedef boost::shared_ptr<broker::Link> LinkPtr;
static bool isTxQueue(const std::string& queue);
- static std::string getTxId(const std::string& queue);
+ static types::Uuid getTxId(const std::string& queue);
static boost::shared_ptr<TxReplicator> create(
HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
@@ -90,7 +92,7 @@ class TxReplicator : public QueueReplicator {
void backups(const std::string& data, sys::Mutex::ScopedLock&);
void end(sys::Mutex::ScopedLock&);
- std::string logPrefix;
+ LogPrefix2 logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
boost::intrusive_ptr<broker::TxBuffer> txBuffer;
broker::MessageStore* store;
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index 3d9941baee..d64f13d9c5 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -165,7 +165,7 @@ struct Client
session.close();
connection.close();
} catch(const std::exception& e) {
- std::cout << e.what() << std::endl;
+ std::cout << "Client shutdown: " << e.what() << std::endl;
}
}
};
@@ -350,7 +350,7 @@ int main(int argc, char** argv)
}
return 0;
} catch(const std::exception& e) {
- std::cout << e.what() << std::endl;
+ std::cout << argv[0] << ": " << e.what() << std::endl;
}
return 2;
}