diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/Exception.cpp | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 17 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandlerObserver.h | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 30 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ErrorListener.h | 60 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.h | 17 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.cpp | 21 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/TxReplicator.h | 3 |
15 files changed, 279 insertions, 76 deletions
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index a6696f06e1..999c2aeb52 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) { Exception::~Exception() throw() {} -std::string Exception::getPrefix() const { return ""; } +std::string Exception::getPrefix() const { return std::string(); } std::string Exception::getMessage() const { return message; } +namespace { const std::string COLON(": "); } + const char* Exception::what() const throw() { // Construct the what string the first time it is needed. if (whatStr.empty()) { - whatStr = getPrefix(); - if (!whatStr.empty()) whatStr += ": "; - whatStr += message; + if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix + getPrefix().empty()) // No prefix + whatStr = message; + else + whatStr = getPrefix() + COLON + message; } return whatStr.c_str(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 5eedafc77b..43f39c2919 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } } catch(const SessionException& e) { - QPID_LOG(error, "Execution exception: " << e.what()); - executionException(e.code, e.what()); // Let subclass handle this first. + executionException(e.code, e.what()); framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; @@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) { sendDetach(); } catch(const ChannelException& e){ - QPID_LOG(error, "Channel exception: " << e.what()); - channelException(e.code, e.what()); // Let subclass handle this first. + channelException(e.code, e.what()); peer.detached(name, e.code); } catch(const ConnectionException& e) { - QPID_LOG(error, "Connection exception: " << e.what()); connectionException(e.code, e.getMessage()); } catch(const std::exception& e) { - QPID_LOG(error, "Unexpected exception: " << e.what()); connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } } @@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) { } void SessionHandler::detached(const std::string& /*name*/, uint8_t code) { - // Special case for detached: Don't check if we are - // attached. Checking can lead to an endless game of "detached - // tennis" on federated brokers. awaitingDetached = false; + // Special case for detached: Don't throw if we are not attached. Doing so + // can lead to an endless game of "detached tennis" on federated brokers. + if (!getState()) return; // Already detached. if (code != session::DETACH_CODE_NORMAL) { sendReady = receiveReady = false; - channelException(convert(code), "session.detached from peer."); + channelException(convert(code), Msg() << "Channel " << channel.get() + << " received session.detached from peer"); } else { handleDetach(); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 13679bd623..4bad8f2960 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -38,6 +38,7 @@ #include "qpid/broker/System.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" +#include "qpid/broker/SessionHandlerObserver.h" #include "qpid/broker/BrokerObservers.h" #include "qpid/management/Manageable.h" #include "qpid/sys/ConnectionCodec.h" @@ -179,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir dataDir; DataDir pagingDir; ConnectionObservers connectionObservers; + SessionHandlerObservers sessionHandlerObservers; BrokerObservers brokerObservers; QueueRegistry queues; @@ -361,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } + SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; } BrokerObservers& getBrokerObservers() { return brokerObservers; } /** Properties to be set on outgoing link connections */ diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index a8f5734af7..3d5faf2dab 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,11 +33,35 @@ using namespace framing; using namespace std; using namespace qpid::sys; +namespace { +class DefaultErrorListener : public SessionHandler::ErrorListener { + public: + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what()); + } + void detach() {} + + private: +}; +} + SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch) : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), - proxy(out) -{} + proxy(out), + errorListener(boost::shared_ptr<ErrorListener>(new DefaultErrorListener())) +{ + c.getBroker().getSessionHandlerObservers().newSessionHandler(*this); +} SessionHandler::~SessionHandler() { diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h new file mode 100644 index 0000000000..6d0ea16254 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h @@ -0,0 +1,51 @@ +#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H +#define QPID_BROKER_SESSIONHANDLEROBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Observers.h" + +namespace qpid { +namespace broker { +class SessionHandler; + +/** + * Observer of session handler events. + */ +class SessionHandlerObserver +{ + public: + virtual ~SessionHandlerObserver() {} + virtual void newSessionHandler(SessionHandler&) {} +}; + + +class SessionHandlerObservers : public Observers<SessionHandlerObserver> { + public: + void newSessionHandler(SessionHandler& sh) { + each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh))); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index b1faf19e52..7928b6ab71 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace -// Listens for errors on the bridge session. -class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { +// Report errors on the broker replication session. +class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener { public: - ErrorListener(const std::string& lp, BrokerReplicator& br) : - logPrefix(lp), brokerReplicator(br) {} + ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} - void connectionException(framing::connection::CloseCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Connection error: " << msg); + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); } - void channelException(framing::session::DetachCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Channel error: " << msg); + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); } - void executionException(framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Execution error: " << msg); + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); } - - void incomingExecutionException( - framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Incoming execution error: " << msg); + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } - void detach() { QPID_LOG(debug, logPrefix << "Session detached."); } private: std::string logPrefix; - BrokerReplicator& brokerReplicator; }; /** Keep track of queues or exchanges during the update process to solve 2 @@ -328,8 +323,7 @@ void BrokerReplicator::initialize() { boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) ); assert(result.second); - result.first->setErrorListener( - boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); + result.first->setErrorListener(boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); broker.getConnectionObservers().add(shared_from_this()); } diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h new file mode 100644 index 0000000000..1ae2078a11 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ErrorListener.h @@ -0,0 +1,60 @@ +#ifndef QPID_HA_ERRORLISTENER_H +#define QPID_HA_ERRORLISTENER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SessionHandler.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace ha { + +/** Default ErrorListener for HA module */ +class ErrorListener : public broker::SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); + } + void detach() { + QPID_LOG(error, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; +}; + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_ERRORLISTENER_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 3bb51b1813..b4d50d1652 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SessionHandlerObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary& primary; }; +class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { + public: + PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; +}; + +class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { + public: + PrimarySessionHandlerObserver(const std::string& logPrefix) + : errorListener(new PrimaryErrorListener(logPrefix)) {} + void newSessionHandler(broker::SessionHandler& sh) { + BrokerInfo info; + // Suppress error logging for backup connections + // TODO aconway 2014-01-31: Be more selective, suppress only expected errors? + if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) { + sh.setErrorListener(errorListener); + } + } + private: + boost::shared_ptr<PrimaryErrorListener> errorListener; +}; + + } // namespace Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()), + sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), queueLimits(logPrefix) { // Note that at this point, we are still rejecting client connections. @@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : } brokerObserver.reset(new PrimaryBrokerObserver(*this)); haBroker.getBroker().getBrokerObservers().add(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver); + checkReady(); // Outside lock // Allow client connections @@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : Primary::~Primary() { if (timerTask) timerTask->cancel(); haBroker.getBroker().getBrokerObservers().remove(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver); haBroker.getObserver()->reset(); } diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 2e32515c9a..af368bca0f 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -42,6 +42,7 @@ class Queue; class Connection; class ConnectionObserver; class BrokerObserver; +class SessionHandlerObserver; class TxBuffer; class DtxBuffer; } @@ -152,6 +153,7 @@ class Primary : public Role BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; boost::shared_ptr<broker::BrokerObserver> brokerObserver; + boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver; boost::intrusive_ptr<sys::TimerTask> timerTask; ReplicaMap replicas; TxMap txMap; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index dc5bf15911..c94ced7024 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver( replicationTest(hb.getSettings().replicateDefault.get()), txBuffer(tx), id(true), - exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()) + exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), + empty(true) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); checkState(SENDING, "Too late for enqueue"); + empty = false; enqueues[q] += m.getReplicationId(); txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); txQueue->deliver(m); @@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue( checkState(SENDING, "Too late for dequeue"); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); + empty = false; txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); } - else { - QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: " - << LogMessageId(*q, pos, id)); - } } namespace { @@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() { } void PrimaryTxObserver::rollback() { - QPID_LOG(debug, logPrefix << "Rollback"); Mutex::ScopedLock l(lock); + // Don't bleat about rolling back empty transactions, this happens all the time + // when a session closes and rolls back its outstanding transaction. + if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); if (state != ENDED) { txQueue->deliver(TxRollbackEvent().message()); end(l); @@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); - QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup); // Normally the backup should be completed before it is cancelled. if (completed(backup, l)) error(backup, "Unexpected disconnect:", l); // Break the pointer cycle if backups have completed and we are done with txBuffer. diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 105fee4d40..5b7c2e3e93 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -55,8 +55,9 @@ class Primary; * A TxReplicator on the backup replicates the tx-queue and creates * a TxBuffer on the backup equivalent to the one on the primary. * - * Also observes the tx-queue for prepare-complete messages and - * subscription cancellations. + * Creates an exchange to receive prepare-ok/prepare-fail messages from backups. + * + * Monitors for tx-queue subscription cancellations. * * THREAD SAFE: called in user connection thread for TX events, * and in backup connection threads for prepare-completed events @@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueueIdsMap enqueues; UuidSet backups; // All backups of transaction. UuidSet incomplete; // Incomplete backups (not yet responded to prepare) + bool empty; // True if the transaction is empty - no enqueues/dequeues. }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 507df6ea5a..59b2013f59 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); } +// Debug log expected exceptions on queue replicator, check incoming execution +// exceptions for "deleted on primary" conditions. class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr<QueueReplicator>& qr) : queueReplicator(qr), logPrefix(qr->logPrefix) {} - void connectionException(framing::connection::CloseCode, const std::string&) {} - void channelException(framing::session::DetachCode, const std::string&) {} - void executionException(framing::execution::ErrorCode, const std::string&) {} - - void incomingExecutionException(ErrorCode e, const std::string& msg) { - queueReplicator->incomingExecutionException(e, msg); + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(ErrorCode code, const std::string& msg) { + if (!queueReplicator->deletedOnPrimary(code, msg)) + QPID_LOG(error, logPrefix << "Incoming " + << framing::createSessionException(code, msg).what()); } void detach() { QPID_LOG(debug, logPrefix << "Session detached"); @@ -197,20 +206,25 @@ void QueueReplicator::disconnect() { // Called from Queue::destroyed() void QueueReplicator::destroy() { + QPID_LOG(debug, logPrefix << "Destroyed"); boost::shared_ptr<Bridge> bridge2; // To call outside of lock { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - QPID_LOG(debug, logPrefix << "Destroyed"); bridge2 = bridge; // call close outside the lock. - // Need to drop shared pointers to avoid pointer cycles keeping this in memory. - queue.reset(); - bridge.reset(); - getBroker()->getExchanges().destroy(getName()); + destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } +void QueueReplicator::destroy(Mutex::ScopedLock&) { + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); +} + + // Called in a broker connection thread when the bridge is created. // Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) { @@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr<IdEvent>(data).id; } -void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { +bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) { if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { // If the queue is destroyed at the same time we are subscribing, we may // get a not-found or resource-deleted exception before the // BrokerReplicator gets the queue-delete event. Shut down the bridge by // calling destroy(), we can let the BrokerReplicator delete the queue // when the queue-delete arrives. - QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg); + QPID_LOG(debug, logPrefix << "Deleted on primary: " + << framing::createSessionException(e, msg).what()); destroy(); + return true; } - else - QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg); + return false; } // Unused Exchange methods. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 22cd13a0a8..f94c6de116 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -46,11 +46,13 @@ class HaBroker; class Settings; /** - * Exchange created on a backup broker to replicate a queue on the primary. + * Exchange created on a backup broker to receive replicated messages and + * replication events from a queue on the primary. It subscribes to the primary + * queue via a ReplicatingSubscription on the primary by passing special + * arguments to the subscribe command. * - * Puts replicated messages on the local queue, handles dequeue events. - * Creates a ReplicatingSubscription on the primary by passing special - * arguments to the consume command. + * It puts replicated messages on the local replica queue and handles dequeue + * events by removing local messages. * * THREAD SAFE: Called in different connection threads. */ @@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange, void disconnect(); // Called when we are disconnected from the primary. - std::string getType() const; + virtual std::string getType() const; void route(broker::Deliverable&); @@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange, void initialize(); // Called as part of create() virtual void deliver(const broker::Message&); + virtual void destroy(); // Called when the queue is destroyed. + virtual void destroy(sys::Mutex::ScopedLock&); sys::Mutex lock; HaBroker& haBroker; @@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange, void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&); void idEvent(const std::string& data, sys::Mutex::ScopedLock&); - void incomingExecutionException(framing::execution::ErrorCode e, - const std::string& msg); + bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg); std::string logPrefix; std::string bridgeName; diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 7ff03b5f92..d2a647ae8f 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -87,7 +87,7 @@ TxReplicator::TxReplicator( QueueReplicator(hb, txQueue, link), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), - ended(false), + empty(true), ended(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { decodeStr(data, e); QPID_LOG(trace, logPrefix << "Enqueue: " << e); enq = e; + empty = false; } void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { @@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { // prepared, then they are all receieved before the prepare event. // We collect the events here so we can do a single scan of the queue in prepare. dequeueState.add(e); + empty = false; } void TxReplicator::DequeueState::add(const TxDequeueEvent& event) { @@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { if (!txBuffer) return; - QPID_LOG(debug, logPrefix << "Rollback"); + // Don't bleat about rolling back empty transactions, this happens all the time + // when a session closes and rolls back its outstanding transaction. + if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); end(l); @@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) { } // Called when the tx queue is deleted. -void TxReplicator::destroy() { - { - sys::Mutex::ScopedLock l(lock); - if (!ended) { - QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback."); - rollback(string(), l); - } +void TxReplicator::destroy(sys::Mutex::ScopedLock& l) { + if (!ended) { + if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback"); + rollback(string(), l); } - QueueReplicator::destroy(); + QueueReplicator::destroy(l); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 7f1256699a..5c509d14a7 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator { // QueueReplicator overrides void route(broker::Deliverable& deliverable); - void destroy(); + using QueueReplicator::destroy; + void destroy(sys::Mutex::ScopedLock&); protected: |
