diff options
author | Gordon Sim <gsim@apache.org> | 2016-04-28 19:01:29 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2016-04-28 19:01:29 +0000 |
commit | 25f4622231d74afbf1dbc2802b26029384c0a9c1 (patch) | |
tree | 192d152ddfdf64d60256f2833eb36cae8eab4903 | |
parent | dd7f2059359a8b2b4786c2c312d873a9d24d32a2 (diff) | |
download | qpid-python-25f4622231d74afbf1dbc2802b26029384c0a9c1.tar.gz |
QPID-7234: allow proper credit processing to happen even for expired messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1741491 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 19 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 2ca2c85c64..490ad91bfb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -55,15 +55,17 @@ const std::string EMPTY_STRING; struct GetNone : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer&) { return false; } + bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct GetAny : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer& transfer) - { + { transfer.retrieve(0); return true; } + bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct MatchAndTrack @@ -147,7 +149,7 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) for (FrameSetQueue::iterator i = received.begin(); i != received.end();) { MessageTransfer transfer(*i, *this); - if (transfer.checkExpired()) { + if (transfer.checkExpired() && handler.expire(transfer)) { i = received.erase(i); } else if (handler.accept(transfer)) { received.erase(i); @@ -282,7 +284,7 @@ IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid: for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); - if (transfer.checkExpired()) { + if (transfer.checkExpired() && handler->expire(transfer)) { QPID_LOG(debug, "Expired received transfer: " << *content->getMethod()); } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 4c9ee68ece..38c293eefc 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -67,6 +67,7 @@ class IncomingMessages { virtual ~Handler() {} virtual bool accept(MessageTransfer& transfer) = 0; + virtual bool expire(MessageTransfer& transfer) = 0; virtual bool isClosed() { return false; } }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index c356bc298b..507b2ddfbf 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -36,7 +36,7 @@ using qpid::messaging::NoMessageAvailable; using qpid::messaging::Receiver; using qpid::messaging::Duration; -void ReceiverImpl::received(qpid::messaging::Message&) +void ReceiverImpl::received() { //TODO: should this be configurable sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 0d3366907b..0d36c862e7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -63,7 +63,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl uint32_t getCapacity(); uint32_t getAvailable(); uint32_t getUnsettled(); - void received(qpid::messaging::Message& message); + void received(); qpid::messaging::Session getSession() const; bool isClosed() const; qpid::messaging::Address getAddress() const; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 1e2b68b24e..df3d6504d9 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -330,6 +330,16 @@ struct IncomingMessageHandler : IncomingMessages::Handler return callback(transfer); } + bool expire(IncomingMessages::MessageTransfer& transfer) + { + if (receiver && receiver->getName() == transfer.getDestination()) { + receiver->received(); + return true; + } else { + return false; + } + } + bool isClosed() { return receiver && receiver->isClosed(); @@ -358,7 +368,7 @@ bool SessionImpl::accept(ReceiverImpl* receiver, { if (receiver->getName() == transfer.getDestination()) { transfer.retrieve(message); - receiver->received(*message); + receiver->received(); return true; } else { return false; |