summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2016-04-28 19:01:29 +0000
committerGordon Sim <gsim@apache.org>2016-04-28 19:01:29 +0000
commit25f4622231d74afbf1dbc2802b26029384c0a9c1 (patch)
tree192d152ddfdf64d60256f2833eb36cae8eab4903
parentdd7f2059359a8b2b4786c2c312d873a9d24d32a2 (diff)
downloadqpid-python-25f4622231d74afbf1dbc2802b26029384c0a9c1.tar.gz
QPID-7234: allow proper credit processing to happen even for expired messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1741491 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp8
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h1
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h2
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp12
5 files changed, 19 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 2ca2c85c64..490ad91bfb 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -55,15 +55,17 @@ const std::string EMPTY_STRING;
struct GetNone : IncomingMessages::Handler
{
bool accept(IncomingMessages::MessageTransfer&) { return false; }
+ bool expire(IncomingMessages::MessageTransfer&) { return false; }
};
struct GetAny : IncomingMessages::Handler
{
bool accept(IncomingMessages::MessageTransfer& transfer)
- {
+ {
transfer.retrieve(0);
return true;
}
+ bool expire(IncomingMessages::MessageTransfer&) { return false; }
};
struct MatchAndTrack
@@ -147,7 +149,7 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
for (FrameSetQueue::iterator i = received.begin(); i != received.end();)
{
MessageTransfer transfer(*i, *this);
- if (transfer.checkExpired()) {
+ if (transfer.checkExpired() && handler.expire(transfer)) {
i = received.erase(i);
} else if (handler.accept(transfer)) {
received.erase(i);
@@ -282,7 +284,7 @@ IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid:
for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
- if (transfer.checkExpired()) {
+ if (transfer.checkExpired() && handler->expire(transfer)) {
QPID_LOG(debug, "Expired received transfer: " << *content->getMethod());
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 4c9ee68ece..38c293eefc 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -67,6 +67,7 @@ class IncomingMessages
{
virtual ~Handler() {}
virtual bool accept(MessageTransfer& transfer) = 0;
+ virtual bool expire(MessageTransfer& transfer) = 0;
virtual bool isClosed() { return false; }
};
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index c356bc298b..507b2ddfbf 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -36,7 +36,7 @@ using qpid::messaging::NoMessageAvailable;
using qpid::messaging::Receiver;
using qpid::messaging::Duration;
-void ReceiverImpl::received(qpid::messaging::Message&)
+void ReceiverImpl::received()
{
//TODO: should this be configurable
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 0d3366907b..0d36c862e7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -63,7 +63,7 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
uint32_t getCapacity();
uint32_t getAvailable();
uint32_t getUnsettled();
- void received(qpid::messaging::Message& message);
+ void received();
qpid::messaging::Session getSession() const;
bool isClosed() const;
qpid::messaging::Address getAddress() const;
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 1e2b68b24e..df3d6504d9 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -330,6 +330,16 @@ struct IncomingMessageHandler : IncomingMessages::Handler
return callback(transfer);
}
+ bool expire(IncomingMessages::MessageTransfer& transfer)
+ {
+ if (receiver && receiver->getName() == transfer.getDestination()) {
+ receiver->received();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
bool isClosed()
{
return receiver && receiver->isClosed();
@@ -358,7 +368,7 @@ bool SessionImpl::accept(ReceiverImpl* receiver,
{
if (receiver->getName() == transfer.getDestination()) {
transfer.retrieve(message);
- receiver->received(*message);
+ receiver->received();
return true;
} else {
return false;