summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-08-30 19:35:36 +0000
committerAlan Conway <aconway@apache.org>2011-08-30 19:35:36 +0000
commit29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e (patch)
tree90197ad29f636385177a55b3876906042114535c /cpp/src/qpid
parent9888c13e5e0a51895cd20ccb1bc5fc95e2a27091 (diff)
downloadqpid-python-29d42b92fe4ce26a9e2e17d95c16dcb9daf45a0e.tar.gz
QPID-3384: DTX transactions - replicate suspended transactions.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1163347 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/SemanticState.h6
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp29
-rw-r--r--cpp/src/qpid/cluster/Connection.h5
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp28
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--cpp/src/qpid/cluster/UpdateReceiver.h15
6 files changed, 58 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 8de884c113..22bc272c50 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -148,9 +148,10 @@ class SemanticState : private boost::noncopyable {
management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
+ typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
- typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
SessionContext& session;
DeliveryAdapter& deliveryAdapter;
@@ -181,6 +182,7 @@ class SemanticState : private boost::noncopyable {
void disable(ConsumerImpl::shared_ptr);
public:
+
SemanticState(DeliveryAdapter&, SessionContext&);
~SemanticState();
@@ -218,6 +220,7 @@ class SemanticState : private boost::noncopyable {
void commit(MessageStore* const store);
void rollback();
void selectDtx();
+ bool getDtxSelected() const { return dtxSelected; }
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
@@ -249,6 +252,7 @@ class SemanticState : private boost::noncopyable {
void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
+ DtxBufferMap& getSuspendedXids() { return suspendedXids; }
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 0691aae711..e0ce8cf9e0 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -419,7 +419,8 @@ void Connection::sessionState(
const SequenceNumber& expected,
const SequenceNumber& received,
const SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete)
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected)
{
sessionState().setState(
replayStart,
@@ -429,7 +430,9 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+ if (dtxSelected) semanticState().selectDtx();
+ QPID_LOG(debug, cluster << " received session state update for "
+ << sessionState().getId());
// The output tasks will be added later in the update process.
connection->getOutputTasks().removeAll();
}
@@ -459,11 +462,14 @@ void Connection::shadowReady(
output.setSendMax(sendMax);
}
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
- broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
- uint32_t index = v.first.second; // Index
- v.second->setDtxBuffer((*record)[index]);
+ broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+ broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+ if (bufRef.suspended)
+ bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+ else
+ bufRef.semanticState->setDtxBuffer(buffer);
}
// Marks the end of the update.
@@ -694,11 +700,12 @@ void Connection::dtxAck() {
dtxAckRecords.clear();
}
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
- // Save the association between DtxBuffer and session so we can
- // set the DtxBuffer on the session at the end of the update
- // when the DtxManager has been replicated.
- updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+ // Save the association between DtxBuffers and the session so we
+ // can set the DtxBuffers at the end of the update when the
+ // DtxManager has been replicated.
+ updateIn.dtxBuffers.push_back(
+ UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
}
// Sent at end of work record.
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 5133e4641e..fe66b77238 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -124,7 +124,8 @@ class Connection :
const framing::SequenceNumber& expected,
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted,
- const SequenceSet& receivedIncomplete);
+ const SequenceSet& receivedIncomplete,
+ bool dtxSelected);
void outputTask(uint16_t channel, const std::string& name);
@@ -173,7 +174,7 @@ class Connection :
bool expired);
void dtxEnd();
void dtxAck();
- void dtxBufferRef(const std::string& xid, uint32_t index);
+ void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
// Encoded exchange replication.
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index a5662bb2b3..3142b44d71 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
std::max(received, ss->receiverGetExpected().command),
received,
ss->receiverGetUnknownComplete(),
- ss->receiverGetIncomplete()
+ ss->receiverGetIncomplete(),
+ ss->getSemanticState().getDtxSelected()
);
// Send frames for partial message in progress.
@@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
client::AsyncSession& updateSession)
{
if (!dr.isEnded() && dr.isAcquired()) {
- // FIXME aconway 2011-08-19: should this be assert or if?
assert(dr.getMessage().payload);
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
@@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
ClusterConnectionProxy proxy;
};
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+ ClusterConnectionProxy proxy(shadowSession);
+ broker::DtxWorkRecord* record =
+ updaterBroker.getDtxManager().getWork(dtx->getXid());
+ proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
void UpdateClient::updateTransactionState(broker::SemanticState& s) {
- broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
- broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+ broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
if (dtx) {
- broker::DtxWorkRecord* record =
- updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
- proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+ updateBufferRef(dtx, false); // Current transaction.
} else if (tx) {
- ClusterConnectionProxy proxy(shadowSession);
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
tx->accept(updater);
proxy.txEnd();
}
+ for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+ i != s.getSuspendedXids().end();
+ ++i)
+ {
+ updateBufferRef(i->second, true);
+ }
}
void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 83d4cfac81..481ee357c7 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -100,6 +100,7 @@ class UpdateClient : public sys::Runnable {
void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
void updateConnection(const boost::intrusive_ptr<Connection>& connection);
void updateSession(broker::SessionHandler& s);
+ void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
void updateTransactionState(broker::SemanticState& s);
void updateOutputTask(const sys::OutputTask* task);
void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h
index 512e59e5a1..81ee3a5ffe 100644
--- a/cpp/src/qpid/cluster/UpdateReceiver.h
+++ b/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -40,11 +40,18 @@ class UpdateReceiver {
/** Management-id for the next shadow connection */
std::string nextShadowMgmtId;
- /** Relationship between DtxBuffers, identified by xid, index in DtxManager,
- * and sessions represented by their SemanticState.
+ /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+ * and the association with a session, either suspended or current.
*/
- typedef std::pair<std::string, uint32_t> DtxBufferRef;
- typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers;
+ struct DtxBufferRef {
+ std::string xid;
+ uint32_t index; // Index in WorkRecord in DtxManager
+ bool suspended; // Is this a suspended or current transaction?
+ broker::SemanticState* semanticState; // Associated session
+ DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
+ : xid(x), index(i), suspended(s), semanticState(ss) {}
+ };
+ typedef std::vector<DtxBufferRef> DtxBuffers;
DtxBuffers dtxBuffers;
};
}} // namespace qpid::cluster