From 149d2d80191504cada7cb46454d5a364c1131a82 Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Thu, 3 Jul 2014 12:56:51 +0000 Subject: [QPID-5866]: [C++ client] AMQP 1.0 closing session without closing receiver first marks further messages as redelivered (previous commit not complete) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607628 13f79535-47bb-0310-9956-ffa450edef68 --- .../cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 19 ++++++++++--------- qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 + 2 files changed, 11 insertions(+), 9 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index d29b2eae6f..9ed3713920 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -140,13 +140,9 @@ void ConnectionContext::endSession(boost::shared_ptr ssn) qpid::sys::ScopedLock l(lock); if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { //explicitly release messages that have yet to be fetched - for (boost::shared_ptr lnk = ssn->nextReceiver(); lnk != boost::shared_ptr(); lnk = ssn->nextReceiver()) { - for (pn_delivery_t* d = pn_link_current(lnk->receiver); d; d = pn_link_current(lnk->receiver)) { - pn_link_advance(lnk->receiver); - pn_delivery_update(d, PN_RELEASED); - pn_delivery_settle(d); - } - } + for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { + drain_and_release_messages(ssn, i->second); + } //wait for outstanding sends to settle while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle before closing"); @@ -338,9 +334,8 @@ void ConnectionContext::detach(boost::shared_ptr ssn, boost::sha ssn->removeSender(lnk->getName()); } -void ConnectionContext::detach(boost::shared_ptr ssn, boost::shared_ptr lnk) +void ConnectionContext::drain_and_release_messages(boost::shared_ptr ssn, boost::shared_ptr lnk) { - qpid::sys::ScopedLock l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); //Not all implementations handle drain correctly, so limit the @@ -356,6 +351,12 @@ void ConnectionContext::detach(boost::shared_ptr ssn, boost::sha pn_delivery_update(d, PN_RELEASED); pn_delivery_settle(d); } +} + +void ConnectionContext::detach(boost::shared_ptr ssn, boost::shared_ptr lnk) +{ + qpid::sys::ScopedLock l(lock); + drain_and_release_messages(ssn, lnk); if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 59270f445d..651cb736fd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -78,6 +78,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void attach(boost::shared_ptr, boost::shared_ptr); void detach(boost::shared_ptr, boost::shared_ptr); void detach(boost::shared_ptr, boost::shared_ptr); + void drain_and_release_messages(boost::shared_ptr, boost::shared_ptr); bool isClosed(boost::shared_ptr, boost::shared_ptr); void send(boost::shared_ptr, boost::shared_ptr ctxt, const qpid::messaging::Message& message, bool sync); bool fetch(boost::shared_ptr ssn, boost::shared_ptr lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); -- cgit v1.2.1