summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-28 21:47:44 +0000
committerAlan Conway <aconway@apache.org>2014-08-28 21:47:44 +0000
commitb93b20dd123757b208f9e78ef778e3648c3438a0 (patch)
tree57c643010a5fc63792b0031eb75682ad728dae31 /cpp/src/qpid
parent2c9370641727e5d365d0cf52e9a0ba2d5faa087b (diff)
downloadqpid-python-b93b20dd123757b208f9e78ef778e3648c3438a0.tar.gz
QPID-5975: HA extra/missing messages when running qpid-txtest2 in a loop with failover.
This is partly not-a-bug, there is a client error handling issue that has been corrected. qpid-txtest2 initializes a queue with messages at the start and drains the queues at the end. These operations are *not transactional*. Therefore duplicates are expected if there is a failover during initialization or draining. When duplicates were observed, there was indeed a failover at one of these times. Making these operations transactional is not enough to pass, now we see the test fail with "no messages to fetch". This is explained as follows: If there is a failover during a transaction, TransactionAborted is raised. The client assumes the transaction was rolled back and re-plays it. However, if the failover occurs at a critical point *after* the client has sent commit but *before* it has received a response, then the the client *does not know* whether the transaction was committed or rolled-back on the new primary. Re-playing in this case can duplicate the transaction. Each transaction moves messages from one queue to another so as long as transactions are atomic the total number of messages will not change. However, if transactions are duplicated, a transactional session may try to move more messages than exist on the queue, hence "no messages to fetch". For example if thread 1 moves N messages from q1 to q2, and thread 2 tries to move N+M messages back, then thread 2 will fail. This problem has been corrected as follows: C++ and python clients now raise the following exceptions: - TransactionAborted: The transaction has definitely been rolled back due to a connection failure before commit or a broker error (e.g. a store error) during commit. It can safely be replayed. - TransactionUnknown: The transaction outcome is unknown because the connection failed at the critical time. There's no simple automatic way to know what happened without examining the state of the broker queues. Unfortunately With this fix qpid-txtest2 is no longer useful test for TX failover because it regularly raises TransactionUnknown and there's not much we can do with that. A better test of TX atomicity with failover is to run a pair of qpid-send/qpid-receive with fail-over and verify that the number of enqueues/dequeues and message depth are a multiple of the transaction size. See the JIRA for such a test. (Note these test also sometimes raise TransactionUnknown but it doesn't matter since all we are checking is that messages go on and off the queues in multiple of the TX size.) ) Note: the original bug also reported seeing missing messages from qpid-txtest2. I don't have a good explanation for that but since the qpid-send/receive test shows that transactions are atomic I am going to let that go for now. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp51
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h9
-rw-r--r--cpp/src/qpid/messaging/exceptions.cpp1
3 files changed, 30 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 576156db00..9299ed7cb1 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -43,7 +43,9 @@
using qpid::messaging::KeyError;
using qpid::messaging::NoMessageAvailable;
using qpid::messaging::MessagingException;
+using qpid::messaging::TransactionError;
using qpid::messaging::TransactionAborted;
+using qpid::messaging::TransactionUnknown;
using qpid::messaging::SessionError;
using qpid::messaging::MessageImplAccess;
using qpid::messaging::Sender;
@@ -56,37 +58,18 @@ namespace amqp0_10 {
typedef qpid::sys::Mutex::ScopedLock ScopedLock;
typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
-SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {}
+SessionImpl::SessionImpl(ConnectionImpl& c, bool t) :
+ connection(&c), transactional(t) {}
bool SessionImpl::isTransactional() const
{
return transactional;
}
-void SessionImpl::abortTransaction()
-{
- ScopedLock l(lock);
- aborted = true;
- checkAbortedLH(l);
-}
-
-void SessionImpl::checkAborted()
-{
- ScopedLock l(lock);
- checkAbortedLH(l);
-}
-
-void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&)
-{
- if (aborted) {
- throw TransactionAborted("Transaction aborted due to transport failure");
- }
-}
-
void SessionImpl::checkError()
{
ScopedLock l(lock);
- checkAbortedLH(l);
+ txError.raise();
qpid::client::SessionBase_0_10Access s(session);
try {
s.get()->assertOpen();
@@ -120,9 +103,19 @@ void SessionImpl::sync(bool block)
void SessionImpl::commit()
{
- if (!execute<Commit>()) {
- throw TransactionAborted("Transaction aborted due to transport failure");
+ try {
+ checkError();
+ committing = true;
+ execute<Commit>();
}
+ catch (const TransactionError&) {
+ assert(txError); // Must be set by thrower of TransactionError
+ }
+ catch (const std::exception& e) {
+ txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what());
+ }
+ committing = false;
+ checkError();
}
void SessionImpl::rollback()
@@ -385,7 +378,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message,
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
while (true) {
- checkAborted();
+ txError.raise();
try {
std::string destination;
if (incoming.getNextDestination(destination, adjust(timeout))) {
@@ -568,7 +561,13 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- if (transactional) abortTransaction();
+ if (transactional) {
+ if (committing)
+ txError = new TransactionUnknown("Transaction outcome unknown: transport failure");
+ else
+ txError = new TransactionAborted("Transaction aborted: transport failure");
+ txError.raise();
+ }
connection->reopen();
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index b2e4cf3f78..3a160b2b91 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -30,6 +30,7 @@
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/sys/Mutex.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/sys/ExceptionHolder.h"
#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -97,7 +98,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
template <class T> bool execute(T& f)
{
try {
- checkAborted();
+ txError.raise();
f();
return true;
} catch (const qpid::TransportFailure&) {
@@ -131,16 +132,14 @@ class SessionImpl : public qpid::messaging::SessionImpl
Receivers receivers;
Senders senders;
const bool transactional;
- bool aborted;
+ bool committing;
+ sys::ExceptionHolder txError;
bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
bool backoff();
- void abortTransaction();
- void checkAborted();
- void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&);
void commitImpl();
void rollbackImpl();
diff --git a/cpp/src/qpid/messaging/exceptions.cpp b/cpp/src/qpid/messaging/exceptions.cpp
index d21477b494..419c508626 100644
--- a/cpp/src/qpid/messaging/exceptions.cpp
+++ b/cpp/src/qpid/messaging/exceptions.cpp
@@ -53,6 +53,7 @@ SessionClosed::SessionClosed() : SessionError("Session Closed") {}
TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {}
TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {}
+TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {}
UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}