summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-22 14:13:14 +0000
committerAlan Conway <aconway@apache.org>2014-08-22 14:13:14 +0000
commit80097244af5350560a787b58a5135ae54365047a (patch)
tree40b4925a61729fa1394789f8f8c6ff5c8429c0cd /qpid/cpp/src
parentdee3a10e026896da06ff178335c0b3f86a7e6604 (diff)
downloadqpid-python-80097244af5350560a787b58a5135ae54365047a.tar.gz
QPID-5855: JAVA Client Can not recieve message with qpid ha cluster "Session exception occured while trying to commit"
The problem: the java client sets the sync flag on tx.commit and then waits for completion of the entire transaction. According to the 0-10 spec, this is correct, the commit (or rollback) will not complete until all of the transactional commands have completed. However the C++ broker was sometimes completing a commit *before* one of the the corresponding enqueues. It issued the completions up to the commit (because the commit is makred sync) but there is a "hole" for the incomplete enqueue. The enqueue is not marked sync so when this hole is filled no completion is sent and the client hangs. Fix: make tx.commit a "sync point", that is it behaves like execution.sync and is not completed till all preceeding commands are complete. Note tx.rollback does not need modification as it is never completed asynchronously. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619816 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCommandCallback.h7
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h16
-rw-r--r--qpid/cpp/src/tests/AsyncCompletion.cpp35
7 files changed, 86 insertions, 21 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
index 2b1f3ad8e4..52dab42948 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
+++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.cpp
@@ -28,8 +28,8 @@ namespace broker {
using namespace framing;
-AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f) :
- AsyncCommandContext(ss), command(f), channel(ss.getChannel())
+AsyncCommandCallback::AsyncCommandCallback(SessionState& ss, Command f, bool sync) :
+ AsyncCommandContext(ss), command(f), channel(ss.getChannel()), syncPoint(sync)
{}
void AsyncCommandCallback::completed(bool sync) {
@@ -57,8 +57,11 @@ void AsyncCommandCallback::complete() {
void AsyncCommandCallback::doCommand() {
SessionState* session = completerContext->getSession();
- if (session && session->isAttached())
- session->completeCommand(id, false, requiresSync, command());
+ if (session && session->isAttached()) {
+ // Complete now unless this is a syncPoint and there are incomplete commands.
+ if (!(syncPoint && session->addPendingExecutionSync(id)))
+ session->completeCommand(id, false, requiresSync, command());
+ }
else
throw InternalErrorException("Cannot complete command, no session");
}
diff --git a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
index dd8214683a..15884ae8d9 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCommandCallback.h
@@ -45,7 +45,11 @@ class AsyncCommandCallback : public SessionState::AsyncCommandContext {
*/
typedef boost::function<std::string ()> Command;
- AsyncCommandCallback(SessionState& ss, Command f);
+ /**
+ * @param syncPoint: if true have this command complete only when all
+ * preceeding commands are complete, like execution.sync.
+ */
+ AsyncCommandCallback(SessionState& ss, Command f, bool syncPoint=false);
void completed(bool sync);
@@ -57,6 +61,7 @@ class AsyncCommandCallback : public SessionState::AsyncCommandContext {
Command command;
uint16_t channel;
+ bool syncPoint;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 83405229ed..1b3823c845 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -198,7 +198,9 @@ void SemanticState::commit(MessageStore* const store)
txBuffer->startCommit(store);
AsyncCommandCallback callback(
session,
- boost::bind(&TxBuffer::endCommit, txBuffer, store));
+ boost::bind(&TxBuffer::endCommit, txBuffer, store),
+ true // This is a sync point
+ );
txBuffer->end(callback);
}
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 92a3dcecc2..53f3e1bab3 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -50,7 +50,7 @@ class SessionContext : public OwnershipToken
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
- virtual void addPendingExecutionSync() = 0;
+ virtual bool addPendingExecutionSync() = 0;
virtual void setUnackedCount(uint64_t) {}
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 186bf77c64..c509ae7c14 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -266,11 +266,14 @@ void SessionState::completeCommand(SequenceNumber id,
// Are there any outstanding Execution.Sync commands pending the
// completion of this cmd? If so, complete them.
while (!pendingExecutionSyncs.empty() &&
- receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) {
- const SequenceNumber id = pendingExecutionSyncs.front();
+ (receiverGetIncomplete().empty() ||
+ receiverGetIncomplete().front() >= pendingExecutionSyncs.front()))
+ {
+ const SequenceNumber syncId = pendingExecutionSyncs.front();
pendingExecutionSyncs.pop();
- QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed.");
- receiverCompleted(id);
+ QPID_LOG(debug, getId() << ": delayed execution.sync " << syncId << " is completed.");
+ if (receiverGetIncomplete().contains(syncId))
+ receiverCompleted(syncId);
callSendCompletion = true; // likely peer is pending for this completion.
}
@@ -348,15 +351,24 @@ void SessionState::setTimeout(uint32_t) { }
// Current received command is an execution.sync command.
// Complete this command only when all preceding commands have completed.
// (called via the invoker() in handleCommand() above)
-void SessionState::addPendingExecutionSync()
-{
- SequenceNumber syncCommandId = currentCommand.getId();
- if (receiverGetIncomplete().front() < syncCommandId) {
+bool SessionState::addPendingExecutionSync() {
+ SequenceNumber id = currentCommand.getId();
+ if (addPendingExecutionSync(id)) {
currentCommand.setCompleteSync(false);
- pendingExecutionSyncs.push(syncCommandId);
+ QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << id);
+ return true;
+ }
+ return false;
+}
+
+bool SessionState::addPendingExecutionSync(SequenceNumber id)
+{
+ if (receiverGetIncomplete().front() < id) {
+ pendingExecutionSyncs.push(id);
asyncCommandCompleter->flushPendingMessages();
- QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId);
+ return true;
}
+ return false;
}
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index ca6d6bf530..c71c520f9c 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -116,9 +116,19 @@ class SessionState : public qpid::SessionState,
const SessionId& getSessionId() const { return getId(); }
- // Used by ExecutionHandler sync command processing. Notifies
- // the SessionState of a received Execution.Sync command.
- void addPendingExecutionSync();
+ /**
+ * Used by ExecutionHandler sync command processing. Notifies
+ * the SessionState of a received Execution.Sync command.
+ * Return true if there are incomplete commands before the execution sync.
+ */
+ bool addPendingExecutionSync();
+
+ /**
+ * Mark commannd ID as an execution sync point, completions will be sent
+ * when all commands up to that point are completed.
+ */
+ bool addPendingExecutionSync(SequenceNumber id);
+
void setUnackedCount(uint64_t count) {
if (mgmtObject)
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp
index e32097106f..dc43f10156 100644
--- a/qpid/cpp/src/tests/AsyncCompletion.cpp
+++ b/qpid/cpp/src/tests/AsyncCompletion.cpp
@@ -44,7 +44,8 @@ using broker::PersistableQueue;
using sys::TIME_SEC;
using boost::intrusive_ptr;
-/** @file Unit tests for async completion.
+/** @file
+ * Unit tests for async completion.
* Using a dummy store, verify that the broker indicates async completion of
* message enqueues at the correct time.
*/
@@ -69,6 +70,10 @@ class AsyncCompletionMessageStore : public NullMessageStore {
QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
+/**
+ * Send a sync after a bunch of incomplete messages, verify the sync completes
+ * only when all the messages are complete.
+ */
QPID_AUTO_TEST_CASE(testWaitTillComplete) {
SessionFixture fix;
AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
@@ -104,6 +109,34 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) {
sync.wait(); // Should complete now, all messages are completed.
}
+/**
+ * Send a sync after all messages are complete, verify it completes immediately.
+ */
+QPID_AUTO_TEST_CASE(testSyncAfterComplete) {
+ SessionFixture fix;
+ AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
+ boost::shared_ptr<qpid::broker::MessageStore> p;
+ p.reset(store);
+ fix.broker->setStore(p);
+ AsyncSession s = fix.session;
+
+ static const int count = 3;
+
+ s.queueDeclare("q", arg::durable=true);
+ // Transfer and complete all the messages
+ for (int i = 0; i < count; ++i) {
+ Message msg(boost::lexical_cast<string>(i), "q");
+ msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+ Completion transfer = s.messageTransfer(arg::content=msg, arg::sync=true);
+ intrusive_ptr<PersistableMessage> enqueued = store->enqueued.pop(TIME_SEC);
+ enqueued->enqueueComplete();
+ transfer.wait();
+ }
+ // Send a sync, make sure it completes immediately
+ Completion sync = s.executionSync(arg::sync=true);
+ sync.wait(); // Should complete now, all messages are completed.
+}
+
QPID_AUTO_TEST_CASE(testGetResult) {
SessionFixture fix;
AsyncSession s = fix.session;