From 29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 30 Aug 2011 19:35:36 +0000 Subject: QPID-3384: DTX transactions - replicate suspended transactions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1163347 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/UpdateClient.cpp | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp') diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index a5662bb2b3..3142b44d71 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, client::AsyncSession& updateSession) { if (!dr.isEnded() && dr.isAcquired()) { - // FIXME aconway 2011-08-19: should this be assert or if? assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. @@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + void UpdateClient::updateTransactionState(broker::SemanticState& s) { - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); if (dtx) { - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + updateBufferRef(dtx, false); // Current transaction. } else if (tx) { - ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } } void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { -- cgit v1.2.1