From 2f28d0179033f3caeed58a69bc9ff11afe6e1b80 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 28 May 2012 18:24:43 +0000 Subject: QPID-3603: Better log messages for HA code. Identify host name of backup in ReplicatingSubscription logs. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343351 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/ha/BrokerInfo.cpp | 5 +- qpid/cpp/src/qpid/ha/BrokerInfo.h | 8 ++- qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 38 +++++++------- qpid/cpp/src/qpid/ha/HaBroker.cpp | 5 +- qpid/cpp/src/qpid/ha/LogPrefix.cpp | 14 ++++-- qpid/cpp/src/qpid/ha/LogPrefix.h | 3 +- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 17 ++++--- qpid/cpp/src/qpid/ha/QueueReplicator.h | 4 +- qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 64 +++++++++++++----------- qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 7 +-- 10 files changed, 97 insertions(+), 68 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 59c58a68b5..2422bcd3e2 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -31,6 +31,7 @@ namespace ha { namespace { std::string SYSTEM_ID="system-id"; std::string HOST_NAME="host-name"; +std::string PORT="port"; std::string STATUS="status"; } @@ -41,6 +42,7 @@ FieldTable BrokerInfo::asFieldTable() const { FieldTable ft; ft.setString(SYSTEM_ID, systemId.str()); ft.setString(HOST_NAME, hostName); + ft.setInt(PORT, port); ft.setInt(STATUS, status); return ft; } @@ -48,11 +50,12 @@ FieldTable BrokerInfo::asFieldTable() const { void BrokerInfo::assign(const FieldTable& ft) { systemId = Uuid(ft.getAsString(SYSTEM_ID)); hostName = ft.getAsString(HOST_NAME); + port = ft.getAsInt(PORT); status = BrokerStatus(ft.getAsInt(STATUS)); } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - return o << b.getHostName() << "(" << b.getSystemId() + return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId() << "," << printable(b.getStatus()) << ")"; } diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index d72b6793ff..7ccbd056c3 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -23,6 +23,7 @@ */ #include "Enum.h" +#include "qpid/Url.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" #include @@ -37,8 +38,8 @@ namespace ha { class BrokerInfo { public: - BrokerInfo(const std::string& host, const framing::Uuid& id) : - hostName(host), systemId(id) {} + BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id) : + hostName(host), port(port_), systemId(id) {} BrokerInfo(const framing::FieldTable& ft) { assign(ft); } framing::FieldTable asFieldTable() const; @@ -47,10 +48,13 @@ class BrokerInfo framing::Uuid getSystemId() const { return systemId; } std::string getHostName() const { return hostName; } BrokerStatus getStatus() const { return status; } + uint16_t getPort() const { return port; } + void setStatus(BrokerStatus s) { status = s; } private: std::string hostName; + uint16_t port; framing::Uuid systemId; BrokerStatus status; }; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 60ec4ea59f..2415aff84a 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -233,7 +233,7 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName); + QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName); } void BrokerReplicator::route(Deliverable& msg) { @@ -271,7 +271,7 @@ void BrokerReplicator::route(Deliverable& msg) { } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() << ": while handling: " << list); throw; } @@ -290,7 +290,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { // The queue was definitely created on the primary. if (broker.getQueues().find(name)) { broker.getQueues().destroy(name); - QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " + QPID_LOG(warning, logPrefix << "Queue declare event, replaced exsiting: " << name); } std::pair, bool> result = @@ -304,7 +304,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); assert(result.second); // Should be true since we destroyed existing queue above - QPID_LOG(debug, logPrefix << "queue declare event: " << name); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); startQueueReplicator(result.first); } } @@ -323,9 +323,9 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { string name = values[QNAME].asString(); boost::shared_ptr queue = broker.getQueues().find(name); if (!queue) { - QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name); } else if (!haBroker.replicateLevel(queue->getSettings())) { - QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name); } else { boost::shared_ptr qr = findQueueReplicator(name); if (qr) { @@ -336,7 +336,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { broker.getExchanges().destroy(qr->getName()); } broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, logPrefix << "queue delete event: " << name); + QPID_LOG(debug, logPrefix << "Queue delete event: " << name); } } @@ -351,7 +351,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { // The exchange was definitely created on the primary. if (broker.getExchanges().find(name)) { broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "exchange declare event, replaced exsiting: " << name) + QPID_LOG(warning, logPrefix << "Exchange declare event, replaced exsiting: " << name) } std::pair, bool> result = broker.createExchange( @@ -363,7 +363,7 @@ void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); assert(result.second); - QPID_LOG(debug, logPrefix << "exchange declare event: " << name); + QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); } } @@ -371,11 +371,11 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); } else if (!haBroker.replicateLevel(exchange->getArgs())) { - QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { - QPID_LOG(debug, logPrefix << "exchange delete event:" << name); + QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -397,7 +397,7 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "bind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -417,7 +417,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->unbind(queue, key, &args); - QPID_LOG(debug, logPrefix << "unbind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -444,7 +444,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: what should we use as connection id?*/); // It is normal for the queue to already exist if we are failing over. if (result.second) startQueueReplicator(result.first); - QPID_LOG(debug, logPrefix << "queue response: " << name); + QPID_LOG(debug, logPrefix << "Queue response: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -461,9 +461,9 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, logPrefix << "exchange response: " << values[NAME].asString()); + QPID_LOG(debug, logPrefix << "Exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, logPrefix << "exchange response, already exists: " << + QPID_LOG(warning, logPrefix << "Exchange response, already exists: " << values[NAME].asString()); } } @@ -503,7 +503,7 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, logPrefix << "bind response: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "Bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -545,7 +545,7 @@ void BrokerReplicator::startQueueReplicator(const boost::shared_ptr& queu { if (haBroker.replicateLevel(queue->getSettings()) == ALL) { boost::shared_ptr qr( - new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, link)); + new QueueReplicator(haBroker, queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 71995c1ac2..9b58bac484 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -55,6 +55,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) status(STANDALONE), excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())), brokerInfo(broker.getSystem()->getNodeName(), + // TODO aconway 2012-05-24: other transports? + broker.getPort(broker::Broker::TCP_TRANSPORT), broker.getSystem()->getSystemId()) { @@ -180,8 +182,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr qr( - new QueueReplicator(LogPrefix(*this, queue->getName()), queue, link)); + boost::shared_ptr qr(new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp index 828e226677..d80fe23458 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp +++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp @@ -25,22 +25,26 @@ namespace qpid { namespace ha { -LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), status(0) { - if (queue.size()) tail = " queue " + queue; +LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) { + if (msg.size()) setMessage(msg); } -LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue) +LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg) : haBroker(lp.haBroker), status(0) { - if (queue.size()) tail = " queue " + queue; + if (msg.size()) setMessage(msg); } LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {} +void LogPrefix::setMessage(const std::string& msg) { + tail = " "+msg+":"; +} + std::ostream& operator<<(std::ostream& o, const LogPrefix& l) { return o << "HA(" << printable(l.status ? *l.status : l.haBroker->getStatus()) - << ")" << l.tail << ": "; + << ")" << l.tail << " "; } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h index 2db9f3c409..da01154a80 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.h +++ b/qpid/cpp/src/qpid/ha/LogPrefix.h @@ -40,10 +40,11 @@ class LogPrefix /** For use by all classes other than HaBroker */ LogPrefix(HaBroker& hb, const std::string& queue=std::string()); LogPrefix(LogPrefix& lp, const std::string& queue); - /** For use by the HaBroker itself. */ LogPrefix(BrokerStatus&); + void setMessage(const std::string&); + private: HaBroker* haBroker; BrokerStatus* status; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 47fc3afdeb..58c5e452d7 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,6 +21,7 @@ #include "Counter.h" #include "QueueReplicator.h" +#include "HaBroker.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" @@ -59,14 +60,15 @@ bool QueueReplicator::isEventKey(const std::string key) { return ret; } -QueueReplicator::QueueReplicator(const LogPrefix& lp, +QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr q, boost::shared_ptr l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), - logPrefix(lp), queue(q), link(l) + haBroker(hb), logPrefix(hb), queue(q), link(l) { - framing::Uuid uuid(true); + Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + logPrefix.setMessage(q->getName()); QPID_LOG(info, logPrefix << "Created"); } @@ -109,12 +111,15 @@ void QueueReplicator::deactivate() { // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { sys::Mutex::ScopedLock l(lock); - framing::AMQP_ServerProxy peer(sessionHandler.out); + AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - framing::FieldTable settings; + FieldTable settings; settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, + queue->getPosition()); + settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, + haBroker.getBrokerInfo().asFieldTable()); SequenceNumber front; if (ReplicatingSubscription::getFront(*queue, front)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index f583b650fa..1b221a8d28 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -42,6 +42,7 @@ class Deliverable; namespace ha { class Counter; +class HaBroker; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -62,7 +63,7 @@ class QueueReplicator : public broker::Exchange, /** Test if a string is an event key */ static bool isEventKey(const std::string key); - QueueReplicator(const LogPrefix&, + QueueReplicator(HaBroker&, boost::shared_ptr q, boost::shared_ptr l); @@ -82,6 +83,7 @@ class QueueReplicator : public broker::Exchange, void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + HaBroker& haBroker; LogPrefix logPrefix; std::string bridgeName; sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 8c88382b94..9d7df51e3b 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "ReplicatingSubscription.h" +#include "HaBroker.h" #include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" @@ -40,6 +41,7 @@ using namespace std; const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); +const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info"); namespace { const string DOLLAR("$"); @@ -59,7 +61,7 @@ class DequeueRemover } void operator()(const QueuedMessage& message) { - if (message.position >= start && message.position <= end) { + if (message.position >= start && message.position <= end) { //i.e. message is within the intial range and has not been dequeued, //so remove it from the dequeues dequeues.remove(message.position); @@ -127,7 +129,7 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( - LogPrefix(haBroker), + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); @@ -173,7 +175,7 @@ ostream& operator<<(ostream& o, const QueueRange& qr) { } ReplicatingSubscription::ReplicatingSubscription( - LogPrefix lp, + HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -186,17 +188,24 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), - logPrefix(lp, queue->getName()), + haBroker(hb), + logPrefix(hb), dummy(new Queue(mask(name))), ready(false) { try { - // FIXME aconway 2012-05-22: use hostname from brokerinfo - // Separate the remote part from a "local-remote" address for logging. - string address = parent->getSession().getConnection().getUrl(); - size_t i = address.find('-'); - if (i != string::npos) address = address.substr(i+1); - logSuffix = " (" + address + ")"; + // Set a log prefix message that identifies the remote broker. + // FIXME aconway 2012-05-24: use URL instead of host:port, include transport? + ostringstream os; + os << queue->getName() << "@"; + FieldTable ft; + if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) { + BrokerInfo info(ft); + os << info.getHostName() << ":" << info.getPort(); + } + else + os << parent->getSession().getConnection().getUrl(); + logPrefix.setMessage(os.str()); QueueRange primary(*queue); QueueRange backup(arguments); @@ -228,16 +237,16 @@ ReplicatingSubscription::ReplicatingSubscription( << " backup range " << backup << " primary range " << primary << " position " << position - << " dequeues " << dequeues << logSuffix); + << " dequeues " << dequeues); } catch (const std::exception& e) { throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " - << e.what() << logSuffix)); + << e.what())); } } ReplicatingSubscription::~ReplicatingSubscription() { - QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << logSuffix); + QPID_LOG(debug, logPrefix << "Detroyed replicating subscription"); } // Called in subscription's connection thread when the subscription is created. @@ -258,7 +267,7 @@ void ReplicatingSubscription::initialize() { } else { QPID_LOG(debug, logPrefix << "Backup subscription catching up from " - << position << " to " << readyPosition << logSuffix); + << position << " to " << readyPosition); } } @@ -267,7 +276,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { try { // Add position events for the subscribed queue, not the internal event queue. if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Replicating " << qm); { sys::Mutex::ScopedLock l(lock); assert(position == qm.position); @@ -296,8 +305,8 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { else return ConsumerImpl::deliver(qm); // Message is for internal event queue. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "error replicating " << qm - << logSuffix << ": " << e.what()); + QPID_LOG(critical, logPrefix << "Error replicating " << qm + << ": " << e.what()); throw; } } @@ -305,7 +314,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { if (ready) return; ready = true; - QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix); + QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); // Notify Primary that a subscription is ready. if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); } @@ -319,7 +328,7 @@ void ReplicatingSubscription::complete( { // Handle completions for the subscribed queue, not the internal event queue. if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Completed " << qm); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and // dequeued, remove it from the set so it only gets completed @@ -337,7 +346,7 @@ void ReplicatingSubscription::complete( // Called in arbitrary connection thread *with the queue lock held* void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { // Delay completion - QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Delaying completion of " << qm); qm.payload->getIngressCompletion().startCompleter(); { sys::Mutex::ScopedLock l(lock); @@ -350,7 +359,7 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) { - QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix); + QPID_LOG(trace, logPrefix << "Cancel completed " << v.second); v.second.payload->getIngressCompletion().finishCompleter(); } @@ -361,8 +370,8 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "cancel backup subscription to " - << getQueue()->getName() << logSuffix); + QPID_LOG(debug, logPrefix << "Cancel backup subscription to " + << getQueue()->getName()); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -385,8 +394,7 @@ bool ReplicatingSubscription::hideDeletedError() { return true; } void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) { if (dequeues.empty()) return; - QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues - << " from " << getQueue()->getName() << logSuffix); + QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); @@ -401,7 +409,7 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { - QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); + QPID_LOG(trace, logPrefix << "Dequeued " << qm); sys::Mutex::ScopedLock l(lock); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then @@ -415,8 +423,8 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. - QPID_LOG(trace, logPrefix << "sending position " << pos - << ", was " << backupPosition << logSuffix); + QPID_LOG(trace, logPrefix << "Sending position " << pos + << ", was " << backupPosition); string buf(pos.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); pos.encode(buffer); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 0956d6d503..ab02949952 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -76,8 +76,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, static const std::string QPID_REPLICATING_SUBSCRIPTION; static const std::string QPID_HIGH_SEQUENCE_NUMBER; static const std::string QPID_LOW_SEQUENCE_NUMBER; + static const std::string QPID_BROKER_INFO; - // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription + // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription /** Get position of front message on queue. *@return false if queue is empty. */ @@ -88,7 +89,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, static bool getNext(broker::Queue&, framing::SequenceNumber from, framing::SequenceNumber& result); - ReplicatingSubscription(LogPrefix, + ReplicatingSubscription(HaBroker&, broker::SemanticState* parent, const std::string& name, boost::shared_ptr , bool ack, bool acquire, bool exclusive, const std::string& tag, @@ -120,8 +121,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, private: typedef std::map Delayed; + HaBroker& haBroker; LogPrefix logPrefix; - std::string logSuffix; boost::shared_ptr dummy; // Used to send event messages Delayed delayed; framing::SequenceSet dequeues; -- cgit v1.2.1