diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:14 +0000 |
| commit | 80097244af5350560a787b58a5135ae54365047a (patch) | |
| tree | 40b4925a61729fa1394789f8f8c6ff5c8429c0cd /qpid/cpp/src | |
| parent | dee3a10e026896da06ff178335c0b3f86a7e6604 (diff) | |
| download | qpid-python-80097244af5350560a787b58a5135ae54365047a.tar.gz | |
QPID-5855: JAVA Client Can not recieve message with qpid ha cluster "Session exception occured while trying to commit"
The problem: the java client sets the sync flag on tx.commit and then waits for
completion of the entire transaction. According to the 0-10 spec, this is
correct, the commit (or rollback) will not complete until all of the
transactional commands have completed. However the C++ broker was sometimes
completing a commit *before* one of the the corresponding enqueues. It issued
the completions up to the commit (because the commit is makred sync) but there
is a "hole" for the incomplete enqueue. The enqueue is not marked sync so when
this hole is filled no completion is sent and the client hangs.
Fix: make tx.commit a "sync point", that is it behaves like execution.sync and
is not completed till all preceeding commands are complete. Note tx.rollback
does not need modification as it is never completed asynchronously.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619816 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCommandCallback.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionContext.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/AsyncCompletion.cpp | 35 |
7 files changed, 86 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp index 2b1f3ad8e4..52dab42948 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp +++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp @@ -28,8 +28,8 @@ namespace broker { using namespace framing; -AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) : - AsyncCommandContext(ss), command(f), channel(ss.getChannel()) +AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f, bool sync) : + AsyncCommandContext(ss), command(f), channel(ss.getChannel()), syncPoint(sync) {} void AsyncCommandCallback::completed(bool sync) { @@ -57,8 +57,11 @@ void AsyncCommandCallback::complete() { void AsyncCommandCallback::doCommand() { SessionState* session = completerContext->getSession(); - if (session && session->isAttached()) - session->completeCommand(id, false, requiresSync, command()); + if (session && session->isAttached()) { + // Complete now unless this is a syncPoint and there are incomplete commands. + if (!(syncPoint && session->addPendingExecutionSync(id))) + session->completeCommand(id, false, requiresSync, command()); + } else throw InternalErrorException("Cannot complete command, no session"); } diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h index dd8214683a..15884ae8d9 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h +++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h @@ -45,7 +45,11 @@ class AsyncCommandCallback : public SessionState::AsyncCommandContext { */ typedef boost::function<std::string ()> Command; - AsyncCommandCallback(SessionState& ss, Command f); + /** + * @param syncPoint: if true have this command complete only when all + * preceeding commands are complete, like execution.sync. + */ + AsyncCommandCallback(SessionState& ss, Command f, bool syncPoint=false); void completed(bool sync); @@ -57,6 +61,7 @@ class AsyncCommandCallback : public SessionState::AsyncCommandContext { Command command; uint16_t channel; + bool syncPoint; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 83405229ed..1b3823c845 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -198,7 +198,9 @@ void SemanticState::commit(MessageStore* const store) txBuffer->startCommit(store); AsyncCommandCallback callback( session, - boost::bind(&TxBuffer::endCommit, txBuffer, store)); + boost::bind(&TxBuffer::endCommit, txBuffer, store), + true // This is a sync point + ); txBuffer->end(callback); } diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 92a3dcecc2..53f3e1bab3 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -50,7 +50,7 @@ class SessionContext : public OwnershipToken virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; virtual const SessionId& getSessionId() const = 0; - virtual void addPendingExecutionSync() = 0; + virtual bool addPendingExecutionSync() = 0; virtual void setUnackedCount(uint64_t) {} }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 186bf77c64..c509ae7c14 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -266,11 +266,14 @@ void SessionState::completeCommand(SequenceNumber id, // Are there any outstanding Execution.Sync commands pending the // completion of this cmd? If so, complete them. while (!pendingExecutionSyncs.empty() && - receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { - const SequenceNumber id = pendingExecutionSyncs.front(); + (receiverGetIncomplete().empty() || + receiverGetIncomplete().front() >= pendingExecutionSyncs.front())) + { + const SequenceNumber syncId = pendingExecutionSyncs.front(); pendingExecutionSyncs.pop(); - QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); - receiverCompleted(id); + QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed."); + if (receiverGetIncomplete().contains(syncId)) + receiverCompleted(syncId); callSendCompletion = true; // likely peer is pending for this completion. } @@ -348,15 +351,24 @@ void SessionState::setTimeout(uint32_t) { } // Current received command is an execution.sync command. // Complete this command only when all preceding commands have completed. // (called via the invoker() in handleCommand() above) -void SessionState::addPendingExecutionSync() -{ - SequenceNumber syncCommandId = currentCommand.getId(); - if (receiverGetIncomplete().front() < syncCommandId) { +bool SessionState::addPendingExecutionSync() { + SequenceNumber id = currentCommand.getId(); + if (addPendingExecutionSync(id)) { currentCommand.setCompleteSync(false); - pendingExecutionSyncs.push(syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id); + return true; + } + return false; +} + +bool SessionState::addPendingExecutionSync(SequenceNumber id) +{ + if (receiverGetIncomplete().front() < id) { + pendingExecutionSyncs.push(id); asyncCommandCompleter->flushPendingMessages(); - QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + return true; } + return false; } /** factory for creating a reference-counted IncompleteIngressMsgXfer object diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index ca6d6bf530..c71c520f9c 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -116,9 +116,19 @@ class SessionState : public qpid::SessionState, const SessionId& getSessionId() const { return getId(); } - // Used by ExecutionHandler sync command processing. Notifies - // the SessionState of a received Execution.Sync command. - void addPendingExecutionSync(); + /** + * Used by ExecutionHandler sync command processing. Notifies + * the SessionState of a received Execution.Sync command. + * Return true if there are incomplete commands before the execution sync. + */ + bool addPendingExecutionSync(); + + /** + * Mark commannd ID as an execution sync point, completions will be sent + * when all commands up to that point are completed. + */ + bool addPendingExecutionSync(SequenceNumber id); + void setUnackedCount(uint64_t count) { if (mgmtObject) diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp index e32097106f..dc43f10156 100644 --- a/qpid/cpp/src/tests/AsyncCompletion.cpp +++ b/qpid/cpp/src/tests/AsyncCompletion.cpp @@ -44,7 +44,8 @@ using broker::PersistableQueue; using sys::TIME_SEC; using boost::intrusive_ptr; -/** @file Unit tests for async completion. +/** @file + * Unit tests for async completion. * Using a dummy store, verify that the broker indicates async completion of * message enqueues at the correct time. */ @@ -69,6 +70,10 @@ class AsyncCompletionMessageStore : public NullMessageStore { QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite) +/** + * Send a sync after a bunch of incomplete messages, verify the sync completes + * only when all the messages are complete. + */ QPID_AUTO_TEST_CASE(testWaitTillComplete) { SessionFixture fix; AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; @@ -104,6 +109,34 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) { sync.wait(); // Should complete now, all messages are completed. } +/** + * Send a sync after all messages are complete, verify it completes immediately. + */ +QPID_AUTO_TEST_CASE(testSyncAfterComplete) { + SessionFixture fix; + AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; + boost::shared_ptr<qpid::broker::MessageStore> p; + p.reset(store); + fix.broker->setStore(p); + AsyncSession s = fix.session; + + static const int count = 3; + + s.queueDeclare("q", arg::durable=true); + // Transfer and complete all the messages + for (int i = 0; i < count; ++i) { + Message msg(boost::lexical_cast<string>(i), "q"); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + Completion transfer = s.messageTransfer(arg::content=msg, arg::sync=true); + intrusive_ptr<PersistableMessage> enqueued = store->enqueued.pop(TIME_SEC); + enqueued->enqueueComplete(); + transfer.wait(); + } + // Send a sync, make sure it completes immediately + Completion sync = s.executionSync(arg::sync=true); + sync.wait(); // Should complete now, all messages are completed. +} + QPID_AUTO_TEST_CASE(testGetResult) { SessionFixture fix; AsyncSession s = fix.session; |
