summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-01-23 16:05:43 +0000
committerGordon Sim <gsim@apache.org>2014-01-23 16:05:43 +0000
commit1dae32d6fd23383f759650607a7cc38e85ac3f79 (patch)
tree43d07c182f111cbc0b821779817ebf07a307635d /qpid/cpp
parentc50c6979268636f521045ae656bbf75a7746e1ad (diff)
downloadqpid-python-1dae32d6fd23383f759650607a7cc38e85ac3f79.tar.gz
QPID-5509: release messages that have not yet been fetched when closing receiver
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560718 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp15
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp13
2 files changed, 17 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 0fa7bf218f..07367b8aa8 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -333,6 +333,21 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_link_drain(lnk->receiver, 0);
+ wakeupDriver();
+ //Not all implementations handle drain correctly, so limit the
+ //time spent waiting for it
+ qpid::sys::AbsTime until(qpid::sys::now(), qpid::sys::TIME_SEC*2);
+ while (pn_link_credit(lnk->receiver) > pn_link_queued(lnk->receiver) && until > qpid::sys::now()) {
+ QPID_LOG(debug, "Waiting for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
+ waitUntil(ssn, lnk, until);
+ }
+ //release as yet unfetched messages:
+ for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) {
+ pn_link_advance(lnk->receiver);
+ pn_delivery_update(d, PN_RELEASED);
+ pn_delivery_settle(d);
+ }
if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
lnk->close();
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 38b3c07cf5..7eaa1c068b 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -58,21 +58,12 @@ uint32_t ReceiverContext::getCapacity()
uint32_t ReceiverContext::getAvailable()
{
- uint32_t count(0);
- for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
- ++count;
- if (d == pn_link_current(receiver)) break;
- }
- return count;
+ return pn_link_queued(receiver);
}
uint32_t ReceiverContext::getUnsettled()
{
- uint32_t count(0);
- for (pn_delivery_t* d = pn_unsettled_head(receiver); d; d = pn_unsettled_next(d)) {
- ++count;
- }
- return count;
+ return pn_link_unsettled(receiver) - pn_link_queued(receiver);
}
void ReceiverContext::close()