diff options
| author | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
| commit | 29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e (patch) | |
| tree | 90197ad29f636385177a55b3876906042114535c /cpp/src/qpid/cluster/UpdateClient.cpp | |
| parent | 9888c13e5e0a51895cd20ccb1bc5fc95e2a27091 (diff) | |
| download | qpid-python-29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e.tar.gz | |
QPID-3384: DTX transactions - replicate suspended transactions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1163347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index a5662bb2b3..3142b44d71 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, client::AsyncSession& updateSession) { if (!dr.isEnded() && dr.isAcquired()) { - // FIXME aconway 2011-08-19: should this be assert or if? assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. @@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + void UpdateClient::updateTransactionState(broker::SemanticState& s) { - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); if (dtx) { - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + updateBufferRef(dtx, false); // Current transaction. } else if (tx) { - ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } } void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { |
