diff options
| author | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-08-30 19:35:36 +0000 |
| commit | 29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e (patch) | |
| tree | 90197ad29f636385177a55b3876906042114535c /cpp/src/qpid | |
| parent | 9888c13e5e0a51895cd20ccb1bc5fc95e2a27091 (diff) | |
| download | qpid-python-29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e.tar.gz | |
QPID-3384: DTX transactions - replicate suspended transactions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1163347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 29 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateReceiver.h | 15 |
6 files changed, 58 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8de884c113..22bc272c50 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -148,9 +148,10 @@ class SemanticState : private boost::noncopyable { management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; + typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; + private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; - typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; SessionContext& session; DeliveryAdapter& deliveryAdapter; @@ -181,6 +182,7 @@ class SemanticState : private boost::noncopyable { void disable(ConsumerImpl::shared_ptr); public: + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); @@ -218,6 +220,7 @@ class SemanticState : private boost::noncopyable { void commit(MessageStore* const store); void rollback(); void selectDtx(); + bool getDtxSelected() const { return dtxSelected; } void startDtx(const std::string& xid, DtxManager& mgr, bool join); void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); @@ -249,6 +252,7 @@ class SemanticState : private boost::noncopyable { void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); + DtxBufferMap& getSuspendedXids() { return suspendedXids; } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 0691aae711..e0ce8cf9e0 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -419,7 +419,8 @@ void Connection::sessionState( const SequenceNumber& expected, const SequenceNumber& received, const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) + const SequenceSet& receivedIncomplete, + bool dtxSelected) { sessionState().setState( replayStart, @@ -429,7 +430,9 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); + if (dtxSelected) semanticState().selectDtx(); + QPID_LOG(debug, cluster << " received session state update for " + << sessionState().getId()); // The output tasks will be added later in the update process. connection->getOutputTasks().removeAll(); } @@ -459,11 +462,14 @@ void Connection::shadowReady( output.setSendMax(sendMax); } -void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) { +void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); - broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID - uint32_t index = v.first.second; // Index - v.second->setDtxBuffer((*record)[index]); + broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); + broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; + if (bufRef.suspended) + bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; + else + bufRef.semanticState->setDtxBuffer(buffer); } // Marks the end of the update. @@ -694,11 +700,12 @@ void Connection::dtxAck() { dtxAckRecords.clear(); } -void Connection::dtxBufferRef(const std::string& xid, uint32_t index) { - // Save the association between DtxBuffer and session so we can - // set the DtxBuffer on the session at the end of the update - // when the DtxManager has been replicated. - updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState(); +void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { + // Save the association between DtxBuffers and the session so we + // can set the DtxBuffers at the end of the update when the + // DtxManager has been replicated. + updateIn.dtxBuffers.push_back( + UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); } // Sent at end of work record. diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 5133e4641e..fe66b77238 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -124,7 +124,8 @@ class Connection : const framing::SequenceNumber& expected, const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); + const SequenceSet& receivedIncomplete, + bool dtxSelected); void outputTask(uint16_t channel, const std::string& name); @@ -173,7 +174,7 @@ class Connection : bool expired); void dtxEnd(); void dtxAck(); - void dtxBufferRef(const std::string& xid, uint32_t index); + void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended); void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout); // Encoded exchange replication. diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index a5662bb2b3..3142b44d71 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { std::max(received, ss->receiverGetExpected().command), received, ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() + ss->receiverGetIncomplete(), + ss->getSemanticState().getDtxSelected() ); // Send frames for partial message in progress. @@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr, client::AsyncSession& updateSession) { if (!dr.isEnded() && dr.isAcquired()) { - // FIXME aconway 2011-08-19: should this be assert or if? assert(dr.getMessage().payload); // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. @@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { ClusterConnectionProxy proxy; }; +void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended) +{ + ClusterConnectionProxy proxy(shadowSession); + broker::DtxWorkRecord* record = + updaterBroker.getDtxManager().getWork(dtx->getXid()); + proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended); + +} + void UpdateClient::updateTransactionState(broker::SemanticState& s) { - broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); - broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr tx = s.getTxBuffer(); + broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer(); if (dtx) { - broker::DtxWorkRecord* record = - updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found - proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx)); + updateBufferRef(dtx, false); // Current transaction. } else if (tx) { - ClusterConnectionProxy proxy(shadowSession); proxy.txStart(); TxOpUpdater updater(*this, shadowSession, expiry); tx->accept(updater); proxy.txEnd(); } + for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin(); + i != s.getSuspendedXids().end(); + ++i) + { + updateBufferRef(i->second, true); + } } void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) { diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 83d4cfac81..481ee357c7 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -100,6 +100,7 @@ class UpdateClient : public sys::Runnable { void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); void updateConnection(const boost::intrusive_ptr<Connection>& connection); void updateSession(broker::SessionHandler& s); + void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended); void updateTransactionState(broker::SemanticState& s); void updateOutputTask(const sys::OutputTask* task); void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h index 512e59e5a1..81ee3a5ffe 100644 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ b/cpp/src/qpid/cluster/UpdateReceiver.h @@ -40,11 +40,18 @@ class UpdateReceiver { /** Management-id for the next shadow connection */ std::string nextShadowMgmtId; - /** Relationship between DtxBuffers, identified by xid, index in DtxManager, - * and sessions represented by their SemanticState. + /** Record the position of a DtxBuffer in the DtxManager (xid + index) + * and the association with a session, either suspended or current. */ - typedef std::pair<std::string, uint32_t> DtxBufferRef; - typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers; + struct DtxBufferRef { + std::string xid; + uint32_t index; // Index in WorkRecord in DtxManager + bool suspended; // Is this a suspended or current transaction? + broker::SemanticState* semanticState; // Associated session + DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss) + : xid(x), index(i), suspended(s), semanticState(ss) {} + }; + typedef std::vector<DtxBufferRef> DtxBuffers; DtxBuffers dtxBuffers; }; }} // namespace qpid::cluster |
