diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-28 21:47:44 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-28 21:47:44 +0000 |
| commit | b93b20dd123757b208f9e78ef778e3648c3438a0 (patch) | |
| tree | 57c643010a5fc63792b0031eb75682ad728dae31 /cpp/src/qpid | |
| parent | 2c9370641727e5d365d0cf52e9a0ba2d5faa087b (diff) | |
| download | qpid-python-b93b20dd123757b208f9e78ef778e3648c3438a0.tar.gz | |
QPID-5975: HA extra/missing messages when running qpid-txtest2 in a loop with failover.
This is partly not-a-bug, there is a client error handling issue that has been
corrected.
qpid-txtest2 initializes a queue with messages at the start and drains the
queues at the end. These operations are *not transactional*. Therefore
duplicates are expected if there is a failover during initialization or
draining. When duplicates were observed, there was indeed a failover at one of
these times.
Making these operations transactional is not enough to pass, now we see the test
fail with "no messages to fetch". This is explained as follows:
If there is a failover during a transaction, TransactionAborted is raised. The
client assumes the transaction was rolled back and re-plays it. However, if the
failover occurs at a critical point *after* the client has sent commit
but *before* it has received a response, then the the client *does not know*
whether the transaction was committed or rolled-back on the new primary.
Re-playing in this case can duplicate the transaction. Each transaction moves
messages from one queue to another so as long as transactions are atomic the
total number of messages will not change. However, if transactions are
duplicated, a transactional session may try to move more messages than exist on
the queue, hence "no messages to fetch". For example if thread 1 moves N
messages from q1 to q2, and thread 2 tries to move N+M messages back, then
thread 2 will fail.
This problem has been corrected as follows: C++ and python clients now raise the
following exceptions:
- TransactionAborted: The transaction has definitely been rolled back due to a
connection failure before commit or a broker error (e.g. a store error) during commit.
It can safely be replayed.
- TransactionUnknown: The transaction outcome is unknown because the connection
failed at the critical time. There's no simple automatic way to know what
happened without examining the state of the broker queues.
Unfortunately With this fix qpid-txtest2 is no longer useful test for TX
failover because it regularly raises TransactionUnknown and there's not much we
can do with that.
A better test of TX atomicity with failover is to run a pair of
qpid-send/qpid-receive with fail-over and verify that the number of
enqueues/dequeues and message depth are a multiple of the transaction size. See
the JIRA for such a test. (Note these test also sometimes raise
TransactionUnknown but it doesn't matter since all we are checking is that
messages go on and off the queues in multiple of the TX size.) )
Note: the original bug also reported seeing missing messages from
qpid-txtest2. I don't have a good explanation for that but since the
qpid-send/receive test shows that transactions are atomic I am going to let that
go for now.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1621211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/messaging/exceptions.cpp | 1 |
3 files changed, 30 insertions, 31 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 576156db00..9299ed7cb1 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -43,7 +43,9 @@ using qpid::messaging::KeyError; using qpid::messaging::NoMessageAvailable; using qpid::messaging::MessagingException; +using qpid::messaging::TransactionError; using qpid::messaging::TransactionAborted; +using qpid::messaging::TransactionUnknown; using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; @@ -56,37 +58,18 @@ namespace amqp0_10 { typedef qpid::sys::Mutex::ScopedLock ScopedLock; typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock; -SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t), aborted(false) {} +SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : + connection(&c), transactional(t) {} bool SessionImpl::isTransactional() const { return transactional; } -void SessionImpl::abortTransaction() -{ - ScopedLock l(lock); - aborted = true; - checkAbortedLH(l); -} - -void SessionImpl::checkAborted() -{ - ScopedLock l(lock); - checkAbortedLH(l); -} - -void SessionImpl::checkAbortedLH(const qpid::sys::Mutex::ScopedLock&) -{ - if (aborted) { - throw TransactionAborted("Transaction aborted due to transport failure"); - } -} - void SessionImpl::checkError() { ScopedLock l(lock); - checkAbortedLH(l); + txError.raise(); qpid::client::SessionBase_0_10Access s(session); try { s.get()->assertOpen(); @@ -120,9 +103,19 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { - if (!execute<Commit>()) { - throw TransactionAborted("Transaction aborted due to transport failure"); + try { + checkError(); + committing = true; + execute<Commit>(); } + catch (const TransactionError&) { + assert(txError); // Must be set by thrower of TransactionError + } + catch (const std::exception& e) { + txError = new TransactionAborted(Msg() << "Transaction aborted: " << e.what()); + } + committing = false; + checkError(); } void SessionImpl::rollback() @@ -385,7 +378,7 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { while (true) { - checkAborted(); + txError.raise(); try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { @@ -568,7 +561,13 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - if (transactional) abortTransaction(); + if (transactional) { + if (committing) + txError = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + txError = new TransactionAborted("Transaction aborted: transport failure"); + txError.raise(); + } connection->reopen(); } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index b2e4cf3f78..3a160b2b91 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -30,6 +30,7 @@ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/ExceptionHolder.h" #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -97,7 +98,7 @@ class SessionImpl : public qpid::messaging::SessionImpl template <class T> bool execute(T& f) { try { - checkAborted(); + txError.raise(); f(); return true; } catch (const qpid::TransportFailure&) { @@ -131,16 +132,14 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; const bool transactional; - bool aborted; + bool committing; + sys::ExceptionHolder txError; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); bool backoff(); - void abortTransaction(); - void checkAborted(); - void checkAbortedLH(const qpid::sys::Mutex::ScopedLock&); void commitImpl(); void rollbackImpl(); diff --git a/cpp/src/qpid/messaging/exceptions.cpp b/cpp/src/qpid/messaging/exceptions.cpp index d21477b494..419c508626 100644 --- a/cpp/src/qpid/messaging/exceptions.cpp +++ b/cpp/src/qpid/messaging/exceptions.cpp @@ -53,6 +53,7 @@ SessionClosed::SessionClosed() : SessionError("Session Closed") {} TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +TransactionUnknown::TransactionUnknown(const std::string& msg) : TransactionError(msg) {} UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} |
