From 1dae32d6fd23383f759650607a7cc38e85ac3f79 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 23 Jan 2014 16:05:43 +0000 Subject: QPID-5509: release messages that have not yet been fetched when closing receiver git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560718 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 15 +++++++++++++++ qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 13 ++----------- 2 files changed, 17 insertions(+), 11 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 0fa7bf218f..07367b8aa8 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -333,6 +333,21 @@ void ConnectionContext::detach(boost::shared_ptr ssn, boost::sha void ConnectionContext::detach(boost::shared_ptr ssn, boost::shared_ptr lnk) { qpid::sys::ScopedLock l(lock); + pn_link_drain(lnk->receiver, 0); + wakeupDriver(); + //Not all implementations handle drain correctly, so limit the + //time spent waiting for it + qpid::sys::AbsTime until(qpid::sys::now(), qpid::sys::TIME_SEC*2); + while (pn_link_credit(lnk->receiver) > pn_link_queued(lnk->receiver) && until > qpid::sys::now()) { + QPID_LOG(debug, "Waiting for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); + waitUntil(ssn, lnk, until); + } + //release as yet unfetched messages: + for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { + pn_link_advance(lnk->receiver); + pn_delivery_update(d, PN_RELEASED); + pn_delivery_settle(d); + } if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 38b3c07cf5..7eaa1c068b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -58,21 +58,12 @@ uint32_t ReceiverContext::getCapacity() uint32_t ReceiverContext::getAvailable() { - uint32_t count(0); - for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { - ++count; - if (d == pn_link_current(receiver)) break; - } - return count; + return pn_link_queued(receiver); } uint32_t ReceiverContext::getUnsettled() { - uint32_t count(0); - for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) { - ++count; - } - return count; + return pn_link_unsettled(receiver) - pn_link_queued(receiver); } void ReceiverContext::close() -- cgit v1.2.1