summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp35
1 files changed, 25 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index e256566d35..c1f0b44ed4 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -38,6 +38,7 @@
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
+#include "DtxTimeout.h"
#include "MessageStore.h"
#include "TxAck.h"
#include "TxPublish.h"
@@ -154,18 +155,15 @@ void Channel::endDtx(const std::string& xid, bool fail){
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
if (fail) {
- accumulatedAck.clear();
dtxBuffer->fail();
} else {
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
- }
-
+ }
dtxBuffer.reset();
- txBuffer.reset();
}
void Channel::suspendDtx(const std::string& xid){
@@ -173,8 +171,10 @@ void Channel::suspendDtx(const std::string& xid){
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
}
+ txBuffer.reset();//ops on this channel no longer transactional
+
+ checkDtxTimeout();
dtxBuffer->setSuspended(true);
- txBuffer.reset();
}
void Channel::resumeDtx(const std::string& xid){
@@ -185,10 +185,20 @@ void Channel::resumeDtx(const std::string& xid){
if (!dtxBuffer->isSuspended()) {
throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
}
- dtxBuffer->setSuspended(true);
+
+ checkDtxTimeout();
+ dtxBuffer->setSuspended(false);
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
+void Channel::checkDtxTimeout()
+{
+ if (dtxBuffer->isExpired()) {
+ dtxBuffer.reset();
+ throw DtxTimeoutException();
+ }
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
@@ -302,9 +312,14 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
-
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
+ if (dtxBuffer.get()) {
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+
} else {
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery