summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h8
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp38
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.h3
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h4
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp64
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h7
10 files changed, 97 insertions, 68 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 59c58a68b5..2422bcd3e2 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -31,6 +31,7 @@ namespace ha {
namespace {
std::string SYSTEM_ID="system-id";
std::string HOST_NAME="host-name";
+std::string PORT="port";
std::string STATUS="status";
}
@@ -41,6 +42,7 @@ FieldTable BrokerInfo::asFieldTable() const {
FieldTable ft;
ft.setString(SYSTEM_ID, systemId.str());
ft.setString(HOST_NAME, hostName);
+ ft.setInt(PORT, port);
ft.setInt(STATUS, status);
return ft;
}
@@ -48,11 +50,12 @@ FieldTable BrokerInfo::asFieldTable() const {
void BrokerInfo::assign(const FieldTable& ft) {
systemId = Uuid(ft.getAsString(SYSTEM_ID));
hostName = ft.getAsString(HOST_NAME);
+ port = ft.getAsInt(PORT);
status = BrokerStatus(ft.getAsInt(STATUS));
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- return o << b.getHostName() << "(" << b.getSystemId()
+ return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId()
<< "," << printable(b.getStatus()) << ")";
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index d72b6793ff..7ccbd056c3 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -23,6 +23,7 @@
*/
#include "Enum.h"
+#include "qpid/Url.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FieldTable.h"
#include <string>
@@ -37,8 +38,8 @@ namespace ha {
class BrokerInfo
{
public:
- BrokerInfo(const std::string& host, const framing::Uuid& id) :
- hostName(host), systemId(id) {}
+ BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id) :
+ hostName(host), port(port_), systemId(id) {}
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
framing::FieldTable asFieldTable() const;
@@ -47,10 +48,13 @@ class BrokerInfo
framing::Uuid getSystemId() const { return systemId; }
std::string getHostName() const { return hostName; }
BrokerStatus getStatus() const { return status; }
+ uint16_t getPort() const { return port; }
+
void setStatus(BrokerStatus s) { status = s; }
private:
std::string hostName;
+ uint16_t port;
framing::Uuid systemId;
BrokerStatus status;
};
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 60ec4ea59f..2415aff84a 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -233,7 +233,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName);
+ QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName);
}
void BrokerReplicator::route(Deliverable& msg) {
@@ -271,7 +271,7 @@ void BrokerReplicator::route(Deliverable& msg) {
}
}
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "configuration failed: " << e.what()
+ QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
<< ": while handling: " << list);
throw;
}
@@ -290,7 +290,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
broker.getQueues().destroy(name);
- QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: "
+ QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: "
<< name);
}
std::pair<boost::shared_ptr<Queue>, bool> result =
@@ -304,7 +304,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
assert(result.second); // Should be true since we destroyed existing queue above
- QPID_LOG(debug, logPrefix << "queue declare event: " << name);
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
startQueueReplicator(result.first);
}
}
@@ -323,9 +323,9 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (!queue) {
- QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " << name);
+ QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name);
} else if (!haBroker.replicateLevel(queue->getSettings())) {
- QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " << name);
+ QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name);
} else {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
if (qr) {
@@ -336,7 +336,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
broker.getExchanges().destroy(qr->getName());
}
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
- QPID_LOG(debug, logPrefix << "queue delete event: " << name);
+ QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
}
}
@@ -351,7 +351,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
broker.getExchanges().destroy(name);
- QPID_LOG(warning, logPrefix << "exchange declare event, replaced exsiting: " << name)
+ QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name)
}
std::pair<boost::shared_ptr<Exchange>, bool> result =
broker.createExchange(
@@ -363,7 +363,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
assert(result.second);
- QPID_LOG(debug, logPrefix << "exchange declare event: " << name);
+ QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
}
}
@@ -371,11 +371,11 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
- QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: " << name);
+ QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
} else if (!haBroker.replicateLevel(exchange->getArgs())) {
- QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: " << name);
+ QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
- QPID_LOG(debug, logPrefix << "exchange delete event:" << name);
+ QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -397,7 +397,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, logPrefix << "bind event: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -417,7 +417,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
exchange->unbind(queue, key, &args);
- QPID_LOG(debug, logPrefix << "unbind event: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -444,7 +444,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: what should we use as connection id?*/);
// It is normal for the queue to already exist if we are failing over.
if (result.second) startQueueReplicator(result.first);
- QPID_LOG(debug, logPrefix << "queue response: " << name);
+ QPID_LOG(debug, logPrefix << "Queue response: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -461,9 +461,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
- QPID_LOG(debug, logPrefix << "exchange response: " << values[NAME].asString());
+ QPID_LOG(debug, logPrefix << "Exchange response: " << values[NAME].asString());
} else {
- QPID_LOG(warning, logPrefix << "exchange response, already exists: " <<
+ QPID_LOG(warning, logPrefix << "Exchange response, already exists: " <<
values[NAME].asString());
}
}
@@ -503,7 +503,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, logPrefix << "bind response: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -545,7 +545,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queu
{
if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, link));
+ new QueueReplicator(haBroker, queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 71995c1ac2..9b58bac484 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -55,6 +55,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
status(STANDALONE),
excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())),
brokerInfo(broker.getSystem()->getNodeName(),
+ // TODO aconway 2012-05-24: other transports?
+ broker.getPort(broker::Broker::TCP_TRANSPORT),
broker.getSystem()->getSystemId())
{
@@ -180,8 +182,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(LogPrefix(*this, queue->getName()), queue, link));
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
index 828e226677..d80fe23458 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
@@ -25,22 +25,26 @@
namespace qpid {
namespace ha {
-LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), status(0) {
- if (queue.size()) tail = " queue " + queue;
+LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) {
+ if (msg.size()) setMessage(msg);
}
-LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue)
+LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg)
: haBroker(lp.haBroker), status(0)
{
- if (queue.size()) tail = " queue " + queue;
+ if (msg.size()) setMessage(msg);
}
LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
+void LogPrefix::setMessage(const std::string& msg) {
+ tail = " "+msg+":";
+}
+
std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
return o << "HA("
<< printable(l.status ? *l.status : l.haBroker->getStatus())
- << ")" << l.tail << ": ";
+ << ")" << l.tail << " ";
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h
index 2db9f3c409..da01154a80 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.h
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.h
@@ -40,10 +40,11 @@ class LogPrefix
/** For use by all classes other than HaBroker */
LogPrefix(HaBroker& hb, const std::string& queue=std::string());
LogPrefix(LogPrefix& lp, const std::string& queue);
-
/** For use by the HaBroker itself. */
LogPrefix(BrokerStatus&);
+ void setMessage(const std::string&);
+
private:
HaBroker* haBroker;
BrokerStatus* status;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 47fc3afdeb..58c5e452d7 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,6 +21,7 @@
#include "Counter.h"
#include "QueueReplicator.h"
+#include "HaBroker.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
@@ -59,14 +60,15 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(const LogPrefix& lp,
+QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- logPrefix(lp), queue(q), link(l)
+ haBroker(hb), logPrefix(hb), queue(q), link(l)
{
- framing::Uuid uuid(true);
+ Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+ logPrefix.setMessage(q->getName());
QPID_LOG(info, logPrefix << "Created");
}
@@ -109,12 +111,15 @@ void QueueReplicator::deactivate() {
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
sys::Mutex::ScopedLock l(lock);
- framing::AMQP_ServerProxy peer(sessionHandler.out);
+ AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
- framing::FieldTable settings;
+ FieldTable settings;
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
+ settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
+ queue->getPosition());
+ settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
+ haBroker.getBrokerInfo().asFieldTable());
SequenceNumber front;
if (ReplicatingSubscription::getFront(*queue, front))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index f583b650fa..1b221a8d28 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -42,6 +42,7 @@ class Deliverable;
namespace ha {
class Counter;
+class HaBroker;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -62,7 +63,7 @@ class QueueReplicator : public broker::Exchange,
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- QueueReplicator(const LogPrefix&,
+ QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
@@ -82,6 +83,7 @@ class QueueReplicator : public broker::Exchange,
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ HaBroker& haBroker;
LogPrefix logPrefix;
std::string bridgeName;
sys::Mutex lock;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 8c88382b94..9d7df51e3b 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicatingSubscription.h"
+#include "HaBroker.h"
#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
@@ -40,6 +41,7 @@ using namespace std;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info");
namespace {
const string DOLLAR("$");
@@ -59,7 +61,7 @@ class DequeueRemover
}
void operator()(const QueuedMessage& message) {
- if (message.position >= start && message.position <= end) {
+ if (message.position >= start && message.position <= end) {
//i.e. message is within the intial range and has not been dequeued,
//so remove it from the dequeues
dequeues.remove(message.position);
@@ -127,7 +129,7 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
- LogPrefix(haBroker),
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->addObserver(rs);
@@ -173,7 +175,7 @@ ostream& operator<<(ostream& o, const QueueRange& qr) {
}
ReplicatingSubscription::ReplicatingSubscription(
- LogPrefix lp,
+ HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -186,17 +188,24 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- logPrefix(lp, queue->getName()),
+ haBroker(hb),
+ logPrefix(hb),
dummy(new Queue(mask(name))),
ready(false)
{
try {
- // FIXME aconway 2012-05-22: use hostname from brokerinfo
- // Separate the remote part from a "local-remote" address for logging.
- string address = parent->getSession().getConnection().getUrl();
- size_t i = address.find('-');
- if (i != string::npos) address = address.substr(i+1);
- logSuffix = " (" + address + ")";
+ // Set a log prefix message that identifies the remote broker.
+ // FIXME aconway 2012-05-24: use URL instead of host:port, include transport?
+ ostringstream os;
+ os << queue->getName() << "@";
+ FieldTable ft;
+ if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) {
+ BrokerInfo info(ft);
+ os << info.getHostName() << ":" << info.getPort();
+ }
+ else
+ os << parent->getSession().getConnection().getUrl();
+ logPrefix.setMessage(os.str());
QueueRange primary(*queue);
QueueRange backup(arguments);
@@ -228,16 +237,16 @@ ReplicatingSubscription::ReplicatingSubscription(
<< " backup range " << backup
<< " primary range " << primary
<< " position " << position
- << " dequeues " << dequeues << logSuffix);
+ << " dequeues " << dequeues);
}
catch (const std::exception& e) {
throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
- << e.what() << logSuffix));
+ << e.what()));
}
}
ReplicatingSubscription::~ReplicatingSubscription() {
- QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << logSuffix);
+ QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
}
// Called in subscription's connection thread when the subscription is created.
@@ -258,7 +267,7 @@ void ReplicatingSubscription::initialize() {
}
else {
QPID_LOG(debug, logPrefix << "Backup subscription catching up from "
- << position << " to " << readyPosition << logSuffix);
+ << position << " to " << readyPosition);
}
}
@@ -267,7 +276,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
try {
// Add position events for the subscribed queue, not the internal event queue.
if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
+ QPID_LOG(trace, logPrefix << "Replicating " << qm);
{
sys::Mutex::ScopedLock l(lock);
assert(position == qm.position);
@@ -296,8 +305,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
else
return ConsumerImpl::deliver(qm); // Message is for internal event queue.
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "error replicating " << qm
- << logSuffix << ": " << e.what());
+ QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ << ": " << e.what());
throw;
}
}
@@ -305,7 +314,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
if (ready) return;
ready = true;
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix);
+ QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
// Notify Primary that a subscription is ready.
if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
}
@@ -319,7 +328,7 @@ void ReplicatingSubscription::complete(
{
// Handle completions for the subscribed queue, not the internal event queue.
if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
+ QPID_LOG(trace, logPrefix << "Completed " << qm);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
// dequeued, remove it from the set so it only gets completed
@@ -337,7 +346,7 @@ void ReplicatingSubscription::complete(
// Called in arbitrary connection thread *with the queue lock held*
void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
// Delay completion
- QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
+ QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
qm.payload->getIngressCompletion().startCompleter();
{
sys::Mutex::ScopedLock l(lock);
@@ -350,7 +359,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
+ QPID_LOG(trace, logPrefix << "Cancel completed " << v.second);
v.second.payload->getIngressCompletion().finishCompleter();
}
@@ -361,8 +370,8 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "cancel backup subscription to "
- << getQueue()->getName() << logSuffix);
+ QPID_LOG(debug, logPrefix << "Cancel backup subscription to "
+ << getQueue()->getName());
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -385,8 +394,7 @@ bool ReplicatingSubscription::hideDeletedError() { return true; }
void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
- QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
- << " from " << getQueue()->getName() << logSuffix);
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
@@ -401,7 +409,7 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
{
- QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
sys::Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
@@ -415,8 +423,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "sending position " << pos
- << ", was " << backupPosition << logSuffix);
+ QPID_LOG(trace, logPrefix << "Sending position " << pos
+ << ", was " << backupPosition);
string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
pos.encode(buffer);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 0956d6d503..ab02949952 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -76,8 +76,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
static const std::string QPID_REPLICATING_SUBSCRIPTION;
static const std::string QPID_HIGH_SEQUENCE_NUMBER;
static const std::string QPID_LOW_SEQUENCE_NUMBER;
+ static const std::string QPID_BROKER_INFO;
- // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription
+ // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
/** Get position of front message on queue.
*@return false if queue is empty.
*/
@@ -88,7 +89,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
static bool getNext(broker::Queue&, framing::SequenceNumber from,
framing::SequenceNumber& result);
- ReplicatingSubscription(LogPrefix,
+ ReplicatingSubscription(HaBroker&,
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
@@ -120,8 +121,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
private:
typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
+ HaBroker& haBroker;
LogPrefix logPrefix;
- std::string logSuffix;
boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
Delayed delayed;
framing::SequenceSet dequeues;