From 78d7f0727227f13da826180b2fe98f799160a93a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 1 Dec 2014 17:41:09 +0000 Subject: QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker. The problem was that messages for browsing receivers were being recorded on the client SessionContext unacked list. This is incorrect since you don't ack browsed messages. They remained on the list after the browsing receiver was closed, and every subsequent call to acknowledge() on the client would attempt to ack these messages for a no-longer-existing link. Fix is to not record browsed messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1642720 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 1 + qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 2 +- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 4 +++- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 2 ++ qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 4 ++-- qpid/cpp/src/qpid/messaging/amqp/SessionContext.h | 2 +- 6 files changed, 10 insertions(+), 5 deletions(-) (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 3ee58cad8d..66aee1ae22 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -44,6 +44,7 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; + bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index fedab4286f..9e3f95742b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr ssn, boost::shared QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current)); + impl.setInternalId(ssn->record(current, lnk->getBrowse())); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 08cc130a9e..454106149d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -36,7 +36,9 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) {} + capacity(0), used(0) +{} + ReceiverContext::~ReceiverContext() { //pn_link_free(receiver); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 2b4e8e1986..8ded487bf3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -60,6 +60,8 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); + bool getBrowse() const { return helper.getBrowse(); } + private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 4e5d71f788..f2b7b24b4c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -110,10 +110,10 @@ uint32_t SessionContext::getUnsettledAcks() return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) { qpid::framing::SequenceNumber id = next++; - unacked[id] = delivery; + if (!browse) unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 8c2bb040a6..b347c327c5 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*); + qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); -- cgit v1.2.1