diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index e256566d35..c1f0b44ed4 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -38,6 +38,7 @@ #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" +#include "DtxTimeout.h" #include "MessageStore.h" #include "TxAck.h" #include "TxPublish.h" @@ -154,18 +155,15 @@ void Channel::endDtx(const std::string& xid, bool fail){ % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); if (fail) { - accumulatedAck.clear(); dtxBuffer->fail(); } else { - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); - } - + } dtxBuffer.reset(); - txBuffer.reset(); } void Channel::suspendDtx(const std::string& xid){ @@ -173,8 +171,10 @@ void Channel::suspendDtx(const std::string& xid){ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); dtxBuffer->setSuspended(true); - txBuffer.reset(); } void Channel::resumeDtx(const std::string& xid){ @@ -185,10 +185,20 @@ void Channel::resumeDtx(const std::string& xid){ if (!dtxBuffer->isSuspended()) { throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); } - dtxBuffer->setSuspended(true); + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); } +void Channel::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) @@ -302,9 +312,14 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); - //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers + if (dtxBuffer.get()) { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery |
