summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
committerAlan Conway <aconway@apache.org>2011-08-25 20:41:28 +0000
commit2fdd2cc2ade41e213ae35818532574bbf40f4a00 (patch)
tree42fb45022ea08fee157abf50713b452acf5eda5d /cpp/src/qpid/cluster/UpdateClient.cpp
parent7f99badd1c330b3a6032b15a13aca1cde81274d3 (diff)
downloadqpid-python-2fdd2cc2ade41e213ae35818532574bbf40f4a00.tar.gz
QPID-3384: Enable DTX transactions in a cluster.
- Replicate DTX state to new members joining. - Use cluster timer for DTX timeouts. - Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1161742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp81
1 files changed, 62 insertions, 19 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index fc104e8ca9..a5662bb2b3 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -45,6 +45,8 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
+#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/TxPublish.h"
#include "qpid/broker/RecoveredDequeue.h"
@@ -65,6 +67,7 @@
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
+#include <iterator>
#include <sstream>
namespace qpid {
@@ -177,9 +180,9 @@ void UpdateClient::update() {
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
+
std::for_each(connections.begin(), connections.end(),
boost::bind(&UpdateClient::updateConnection, this, _1));
- session.queueDelete(arg::queue=UPDATE);
// some Queue Observers need session state & msgs synced first, so sync observers now
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
@@ -189,6 +192,8 @@ void UpdateClient::update() {
updateLinks();
updateManagementAgent();
+ updateDtxManager();
+ session.queueDelete(arg::queue=UPDATE);
session.close();
@@ -356,7 +361,8 @@ class MessageUpdater {
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
- morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+ morecontent = message.payload->getContentFrame(
+ *(message.queue), frame, maxContentSize, offset);
sb.get()->sendRawFrame(frame);
}
}
@@ -479,9 +485,9 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
QPID_LOG(debug, *this << " updating unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
std::for_each(drs.begin(), drs.end(),
- boost::bind(&UpdateClient::updateUnacked, this, _1));
+ boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
- updateTxState(ss->getSemanticState()); // Tx transaction state.
+ updateTransactionState(ss->getSemanticState());
// Adjust command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -542,14 +548,18 @@ void UpdateClient::updateConsumer(
<< " on " << shadowSession.getId());
}
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
- if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
+ client::AsyncSession& updateSession)
+{
+ if (!dr.isEnded() && dr.isAcquired()) {
+ // FIXME aconway 2011-08-19: should this be assert or if?
+ assert(dr.getMessage().payload);
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
}
- ClusterConnectionProxy(shadowSession).deliveryRecord(
+ ClusterConnectionProxy(updateSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
dr.getTag(),
@@ -570,8 +580,10 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
: MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
- void operator()(const broker::DtxAck& ) {
- throw InternalErrorException("DTX transactions not currently supported by cluster.");
+ void operator()(const broker::DtxAck& ack) {
+ std::for_each(ack.getPending().begin(), ack.getPending().end(),
+ boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
+ proxy.dtxAck();
}
void operator()(const broker::RecoveredDequeue& rdeq) {
@@ -588,13 +600,18 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
proxy.txAccept(txAccept.getAcked());
}
+ typedef std::list<Queue::shared_ptr> QueueList;
+
+ void copy(const QueueList& l, Array& a) {
+ for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
+ a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ }
+
void operator()(const broker::TxPublish& txPub) {
updateMessage(txPub.getMessage());
- typedef std::list<Queue::shared_ptr> QueueList;
- const QueueList& qlist = txPub.getQueues();
+ assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
Array qarray(TYPE_CODE_STR8);
- for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
- qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
proxy.txPublish(qarray, txPub.delivered);
}
@@ -604,19 +621,33 @@ class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
ClusterConnectionProxy proxy;
};
-void UpdateClient::updateTxState(broker::SemanticState& s) {
- QPID_LOG(debug, *this << " updating TX transaction state.");
+void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+ broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+ broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
- broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
- if (txBuffer) {
+ if (dtx) {
+ broker::DtxWorkRecord* record =
+ updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
+ proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+ } else if (tx) {
+ ClusterConnectionProxy proxy(shadowSession);
proxy.txStart();
TxOpUpdater updater(*this, shadowSession, expiry);
- txBuffer->accept(updater);
+ tx->accept(updater);
proxy.txEnd();
}
}
+void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
+ ClusterConnectionProxy proxy(session);
+ proxy.dtxStart(
+ dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
+ TxOpUpdater updater(*this, session, expiry);
+ dtx->accept(updater);
+ proxy.dtxEnd();
+}
+
void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
queue->getListeners().eachListener(
boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1));
@@ -667,5 +698,17 @@ void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
}
}
+void UpdateClient::updateDtxManager() {
+ broker::DtxManager& dtm = updaterBroker.getDtxManager();
+ dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
+}
+
+void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
+ QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
+ for (size_t i = 0; i < r.size(); ++i)
+ updateDtxBuffer(r[i]);
+ ClusterConnectionProxy(session).dtxWorkRecord(
+ r.getXid(), r.isPrepared(), r.getTimeout());
+}
}} // namespace qpid::cluster