From 359f60c318627900c3ac216496486c42d1a4df8a Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 5 Nov 2008 21:12:54 +0000 Subject: Added ability to release messages through the Subscription class (+test) Added another mode for managing completion (+test) Fixed regression where bytes credit was not reallocated in windowing mode after an accept/release Fixed regression where subscribe request is issued before listener is registered with dispatcher git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@711698 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/ClientSessionTest.cpp | 84 +++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) (limited to 'cpp/src/tests/ClientSessionTest.cpp') diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index abe317aad8..cca16bd9f8 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -296,6 +296,90 @@ QPID_AUTO_TEST_CASE(testExpirationOnPop) { } } +QPID_AUTO_TEST_CASE(testRelease) { + ClientSessionFixture fix; + + const uint count=10; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + fix.subs.setAutoStop(false); + sys::Thread runner(fix.subs);//start dispatcher thread + SubscriptionSettings settings; + settings.autoAck = 0; + + SimpleListener l1; + Subscription s1 = fix.subs.subscribe(l1, "my-queue", settings); + l1.waitFor(count); + s1.cancel(); + + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l1.messages[i].getData()); + } + s1.release(s1.getUnaccepted()); + + //check that released messages are redelivered + settings.autoAck = 1; + SimpleListener l2; + Subscription s2 = fix.subs.subscribe(l2, "my-queue", settings); + l2.waitFor(count); + for (uint i = 0; i < count; i++) { + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), l2.messages[i].getData()); + } + + fix.subs.stop(); + runner.join(); + fix.session.close(); +} + +QPID_AUTO_TEST_CASE(testCompleteOnAccept) { + ClientSessionFixture fix; + + fix.session.queueDeclare(arg::queue="HELP_FIND_ME"); + + const uint count = 8; + const uint chunk = 4; + for (uint i = 0; i < count; i++) { + Message m((boost::format("Message_%1%") % (i+1)).str(), "my-queue"); + fix.session.messageTransfer(arg::content=m); + } + + SubscriptionSettings settings; + settings.autoAck = 0; + settings.completionMode = COMPLETE_ON_ACCEPT; + settings.flowControl = FlowControl::messageWindow(chunk); + + LocalQueue q; + Subscription s = fix.subs.subscribe(q, "my-queue", settings); + fix.session.messageFlush(arg::destination=s.getName()); + SequenceSet accepted; + for (uint i = 0; i < chunk; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + Message m; + BOOST_CHECK(!q.get(m)); + + s.accept(accepted); + fix.session.messageFlush(arg::destination=s.getName()); + accepted.clear(); + + for (uint i = chunk; i < count; i++) { + Message m; + BOOST_CHECK(q.get(m)); + BOOST_CHECK_EQUAL((boost::format("Message_%1%") % (i+1)).str(), m.getData()); + accepted.add(m.getId()); + } + fix.session.messageAccept(accepted); + + fix.session.queueDelete(arg::queue="HELP_FIND_ME"); + +} + QPID_AUTO_TEST_SUITE_END() -- cgit v1.2.1