diff options
| author | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
| commit | 29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e (patch) | |
| tree | 90197ad29f636385177a55b3876906042114535c /cpp/src/qpid/cluster/Connection.cpp | |
| parent | 9888c13e5e0a51895cd20ccb1bc5fc95e2a27091 (diff) | |
| download | qpid-python-29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e.tar.gz | |
QPID-3384: DTX transactions - replicate suspended transactions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1163347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 0691aae711..e0ce8cf9e0 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -419,7 +419,8 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) + const SequenceSet& receivedIncomplete, + bool dtxSelected) { sessionState().setState( replayStart, @@ -429,7 +430,9 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + if (dtxSelected) semanticState().selectDtx(); + QPID_LOG(debug, cluster << " received session state update for " + << sessionState().getId()); // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -459,11 +462,14 @@ void Connection::shadowReady( output.setSendMax(sendMax); } -void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { +void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID - uint32_t index = v.first.second; // Index - v.second->setDtxBuffer((*record)[index]); + broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); + broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; + if (bufRef.suspended) + bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; + else + bufRef.semanticState->setDtxBuffer(buffer); } // Marks the end of the update. @@ -694,11 +700,12 @@ void Connection::dtxAck() { dtxAckRecords.clear(); } -void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { - // Save the association between DtxBuffer and session so we can - // set the DtxBuffer on the session at the end of the update - // when the DtxManager has been replicated. - updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { + // Save the association between DtxBuffers and the session so we + // can set the DtxBuffers at the end of the update when the + // DtxManager has been replicated. + updateIn.dtxBuffers.push_back( + UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); } // Sent at end of work record. |
