summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/Exception.cpp12
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandlerObserver.h51
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/ErrorListener.h60
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h17
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h3
15 files changed, 279 insertions, 76 deletions
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp
index a6696f06e1..999c2aeb52 100644
--- a/qpid/cpp/src/qpid/Exception.cpp
+++ b/qpid/cpp/src/qpid/Exception.cpp
@@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) {
Exception::~Exception() throw() {}
-std::string Exception::getPrefix() const { return ""; }
+std::string Exception::getPrefix() const { return std::string(); }
std::string Exception::getMessage() const { return message; }
+namespace { const std::string COLON(": "); }
+
const char* Exception::what() const throw() {
// Construct the what string the first time it is needed.
if (whatStr.empty()) {
- whatStr = getPrefix();
- if (!whatStr.empty()) whatStr += ": ";
- whatStr += message;
+ if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix
+ getPrefix().empty()) // No prefix
+ whatStr = message;
+ else
+ whatStr = getPrefix() + COLON + message;
}
return whatStr.c_str();
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 5eedafc77b..43f39c2919 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
catch(const SessionException& e) {
- QPID_LOG(error, "Execution exception: " << e.what());
- executionException(e.code, e.what()); // Let subclass handle this first.
+ executionException(e.code, e.what());
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) {
sendDetach();
}
catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception: " << e.what());
- channelException(e.code, e.what()); // Let subclass handle this first.
+ channelException(e.code, e.what());
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
- QPID_LOG(error, "Connection exception: " << e.what());
connectionException(e.code, e.getMessage());
}
catch(const std::exception& e) {
- QPID_LOG(error, "Unexpected exception: " << e.what());
connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
}
@@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) {
}
void SessionHandler::detached(const std::string& /*name*/, uint8_t code) {
- // Special case for detached: Don't check if we are
- // attached. Checking can lead to an endless game of "detached
- // tennis" on federated brokers.
awaitingDetached = false;
+ // Special case for detached: Don't throw if we are not attached. Doing so
+ // can lead to an endless game of "detached tennis" on federated brokers.
+ if (!getState()) return; // Already detached.
if (code != session::DETACH_CODE_NORMAL) {
sendReady = receiveReady = false;
- channelException(convert(code), "session.detached from peer.");
+ channelException(convert(code), Msg() << "Channel " << channel.get()
+ << " received session.detached from peer");
} else {
handleDetach();
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 13679bd623..4bad8f2960 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -38,6 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/broker/BrokerObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/ConnectionCodec.h"
@@ -179,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir dataDir;
DataDir pagingDir;
ConnectionObservers connectionObservers;
+ SessionHandlerObservers sessionHandlerObservers;
BrokerObservers brokerObservers;
QueueRegistry queues;
@@ -361,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; }
BrokerObservers& getBrokerObservers() { return brokerObservers; }
/** Properties to be set on outgoing link connections */
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index a8f5734af7..3d5faf2dab 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -33,11 +33,35 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
+namespace {
+class DefaultErrorListener : public SessionHandler::ErrorListener {
+ public:
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {}
+
+ private:
+};
+}
+
SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
-{}
+ proxy(out),
+ errorListener(boost::shared_ptr<ErrorListener>(new DefaultErrorListener()))
+{
+ c.getBroker().getSessionHandlerObservers().newSessionHandler(*this);
+}
SessionHandler::~SessionHandler()
{
diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
new file mode 100644
index 0000000000..6d0ea16254
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H
+#define QPID_BROKER_SESSIONHANDLEROBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Observers.h"
+
+namespace qpid {
+namespace broker {
+class SessionHandler;
+
+/**
+ * Observer of session handler events.
+ */
+class SessionHandlerObserver
+{
+ public:
+ virtual ~SessionHandlerObserver() {}
+ virtual void newSessionHandler(SessionHandler&) {}
+};
+
+
+class SessionHandlerObservers : public Observers<SessionHandlerObserver> {
+ public:
+ void newSessionHandler(SessionHandler& sh) {
+ each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh)));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index b1faf19e52..7928b6ab71 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
-// Listens for errors on the bridge session.
-class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+// Report errors on the broker replication session.
+class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& lp, BrokerReplicator& br) :
- logPrefix(lp), brokerReplicator(br) {}
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
}
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
}
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
}
-
- void incomingExecutionException(
- framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
-
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
private:
std::string logPrefix;
- BrokerReplicator& brokerReplicator;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -328,8 +323,7 @@ void BrokerReplicator::initialize() {
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
assert(result.second);
- result.first->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
+ result.first->setErrorListener(boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
broker.getConnectionObservers().add(shared_from_this());
}
diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h
new file mode 100644
index 0000000000..1ae2078a11
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ErrorListener.h
@@ -0,0 +1,60 @@
+#ifndef QPID_HA_ERRORLISTENER_H
+#define QPID_HA_ERRORLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace ha {
+
+/** Default ErrorListener for HA module */
+class ErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(error, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ERRORLISTENER_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 2e32515c9a..af368bca0f 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -42,6 +42,7 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class SessionHandlerObserver;
class TxBuffer;
class DtxBuffer;
}
@@ -152,6 +153,7 @@ class Primary : public Role
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
+ boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
TxMap txMap;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index dc5bf15911..c94ced7024 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver(
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ empty(true)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
checkState(SENDING, "Too late for enqueue");
+ empty = false;
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
txQueue->deliver(m);
@@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue(
checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ empty = false;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
- else {
- QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
- << LogMessageId(*q, pos, id));
- }
}
namespace {
@@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() {
}
void PrimaryTxObserver::rollback() {
- QPID_LOG(debug, logPrefix << "Rollback");
Mutex::ScopedLock l(lock);
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (state != ENDED) {
txQueue->deliver(TxRollbackEvent().message());
end(l);
@@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
// Normally the backup should be completed before it is cancelled.
if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
// Break the pointer cycle if backups have completed and we are done with txBuffer.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 105fee4d40..5b7c2e3e93 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -55,8 +55,9 @@ class Primary;
* A TxReplicator on the backup replicates the tx-queue and creates
* a TxBuffer on the backup equivalent to the one on the primary.
*
- * Also observes the tx-queue for prepare-complete messages and
- * subscription cancellations.
+ * Creates an exchange to receive prepare-ok/prepare-fail messages from backups.
+ *
+ * Monitors for tx-queue subscription cancellations.
*
* THREAD SAFE: called in user connection thread for TX events,
* and in backup connection threads for prepare-completed events
@@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueueIdsMap enqueues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
+ bool empty; // True if the transaction is empty - no enqueues/dequeues.
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 507df6ea5a..59b2013f59 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
+// Debug log expected exceptions on queue replicator, check incoming execution
+// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix) {}
- void connectionException(framing::connection::CloseCode, const std::string&) {}
- void channelException(framing::session::DetachCode, const std::string&) {}
- void executionException(framing::execution::ErrorCode, const std::string&) {}
-
- void incomingExecutionException(ErrorCode e, const std::string& msg) {
- queueReplicator->incomingExecutionException(e, msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(ErrorCode code, const std::string& msg) {
+ if (!queueReplicator->deletedOnPrimary(code, msg))
+ QPID_LOG(error, logPrefix << "Incoming "
+ << framing::createSessionException(code, msg).what());
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
@@ -197,20 +206,25 @@ void QueueReplicator::disconnect() {
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
+ QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
+void QueueReplicator::destroy(Mutex::ScopedLock&) {
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+}
+
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
@@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
- QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ QPID_LOG(debug, logPrefix << "Deleted on primary: "
+ << framing::createSessionException(e, msg).what());
destroy();
+ return true;
}
- else
- QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+ return false;
}
// Unused Exchange methods.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 22cd13a0a8..f94c6de116 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -46,11 +46,13 @@ class HaBroker;
class Settings;
/**
- * Exchange created on a backup broker to replicate a queue on the primary.
+ * Exchange created on a backup broker to receive replicated messages and
+ * replication events from a queue on the primary. It subscribes to the primary
+ * queue via a ReplicatingSubscription on the primary by passing special
+ * arguments to the subscribe command.
*
- * Puts replicated messages on the local queue, handles dequeue events.
- * Creates a ReplicatingSubscription on the primary by passing special
- * arguments to the consume command.
+ * It puts replicated messages on the local replica queue and handles dequeue
+ * events by removing local messages.
*
* THREAD SAFE: Called in different connection threads.
*/
@@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange,
void disconnect(); // Called when we are disconnected from the primary.
- std::string getType() const;
+ virtual std::string getType() const;
void route(broker::Deliverable&);
@@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange,
void initialize(); // Called as part of create()
virtual void deliver(const broker::Message&);
+
virtual void destroy(); // Called when the queue is destroyed.
+ virtual void destroy(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange,
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
- void incomingExecutionException(framing::execution::ErrorCode e,
- const std::string& msg);
+ bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
std::string logPrefix;
std::string bridgeName;
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 7ff03b5f92..d2a647ae8f 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -87,7 +87,7 @@ TxReplicator::TxReplicator(
QueueReplicator(hb, txQueue, link),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- ended(false),
+ empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
enq = e;
+ empty = false;
}
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
@@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
// prepared, then they are all receieved before the prepare event.
// We collect the events here so we can do a single scan of the queue in prepare.
dequeueState.add(e);
+ empty = false;
}
void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
@@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
if (!txBuffer) return;
- QPID_LOG(debug, logPrefix << "Rollback");
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
@@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) {
}
// Called when the tx queue is deleted.
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (!ended) {
- QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
- rollback(string(), l);
- }
+void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
+ if (!ended) {
+ if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
+ rollback(string(), l);
}
- QueueReplicator::destroy();
+ QueueReplicator::destroy(l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 7f1256699a..5c509d14a7 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator {
// QueueReplicator overrides
void route(broker::Deliverable& deliverable);
- void destroy();
+ using QueueReplicator::destroy;
+ void destroy(sys::Mutex::ScopedLock&);
protected: