summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2016-04-29 14:16:42 +0000
committerGordon Sim <gsim@apache.org>2016-04-29 14:16:42 +0000
commitab129de8ffa271b543ca03a4e25026ffb8d65d2c (patch)
tree2a118d38d64e4ec55562ac98881dc7f4f6c9293b
parent2a8f44b4f4d015c15d8f4d35b14ae38e3cad6309 (diff)
downloadqpid-python-ab129de8ffa271b543ca03a4e25026ffb8d65d2c.tar.gz
QPID-7240: use protper cursor type when purging priority levels
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1741635 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp2
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp40
2 files changed, 41 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
index 5e60fe5cce..58764637b1 100644
--- a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -77,7 +77,7 @@ Message* PriorityQueue::next(QueueCursor& cursor)
{
boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
if (!ctxt) {
- ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, CONSUMER));
+ ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, cursor.type));
cursor.context = ctxt;
}
if (cursor.type == REPLICATOR) {
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index fe9f6d2665..6bc43bc8e1 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1562,6 +1562,46 @@ QPID_AUTO_TEST_CASE(testClientExpiration)
BOOST_CHECK_EQUAL(b_count, 50);
}
+QPID_AUTO_TEST_CASE(testPriorityRingEviction)
+{
+ MessagingFixture fix;
+ std::string queue("queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{qpid.priorities:10, qpid.max_count:5, qpid.policy_type:ring}}}}");
+ Sender sender = fix.session.createSender(queue);
+ Receiver receiver = fix.session.createReceiver(queue);
+ std::vector<Message> acquired;
+ for (uint i = 0; i < 5; ++i) {
+ Message msg((boost::format("msg_%1%") % (i+1)).str());
+ sender.send(msg);
+ }
+ //fetch but don't acknowledge messages, leaving them in acquired state
+ for (uint i = 0; i < 5; ++i) {
+ Message msg;
+ BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE));
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str());
+ acquired.push_back(msg);
+ }
+ //send 5 more messages to the queue, which should cause all the
+ //acquired messages to be dropped
+ for (uint i = 5; i < 10; ++i) {
+ Message msg((boost::format("msg_%1%") % (i+1)).str());
+ sender.send(msg);
+ }
+ //now release the acquired messages, which should have been evicted...
+ for (std::vector<Message>::iterator i = acquired.begin(); i != acquired.end(); ++i) {
+ fix.session.release(*i);
+ }
+ acquired.clear();
+ //and check that the newest five are received
+ for (uint i = 5; i < 10; ++i) {
+ Message msg;
+ BOOST_CHECK(receiver.fetch(msg, Duration::IMMEDIATE));
+ BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("msg_%1%") % (i+1)).str());
+ acquired.push_back(msg);
+ }
+ Message msg;
+ BOOST_CHECK(!receiver.fetch(msg, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests