From 57bd5193208b228c1088586917d7f43f13e0dd9a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Sat, 25 Oct 2008 01:55:06 +0000 Subject: Client API change: Centralize access to subscription status, better control of acquire/accept. client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe. client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools. Issues addressed by the change: - old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing. - old AckPolicy was broken - not possible to access the instance associated with an active subscription - old AckPolicy did not provide a way to do manual acquire, only accept. - setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually. - a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707808 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/ClientSessionTest.cpp | 56 ++++++++++--------------------------- 1 file changed, 15 insertions(+), 41 deletions(-) (limited to 'cpp/src/tests/ClientSessionTest.cpp') diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 440605a2e4..abe317aad8 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -20,8 +20,7 @@ */ #include "unit_test.h" #include "BrokerFixture.h" -#include "qpid/client/AckPolicy.h" -#include "qpid/client/Dispatcher.h" +#include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" @@ -52,22 +51,22 @@ struct DummyListener : public sys::Runnable, public MessageListener { std::vector messages; string name; uint expected; - Dispatcher dispatcher; + SubscriptionManager submgr; DummyListener(Session& session, const string& n, uint ex) : - name(n), expected(ex), dispatcher(session) {} + name(n), expected(ex), submgr(session) {} void run() { - dispatcher.listen(name, this); - dispatcher.run(); + submgr.subscribe(*this, name); + submgr.run(); } void received(Message& msg) { messages.push_back(msg); if (--expected == 0) { - dispatcher.stop(); + submgr.stop(); } } }; @@ -95,53 +94,30 @@ struct SimpleListener : public MessageListener struct ClientSessionFixture : public ProxySessionFixture { - ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) {} - - void declareSubscribe(const string& q="my-queue", - const string& dest="my-dest") - { - session.queueDeclare(arg::queue=q); - session.messageSubscribe(arg::queue=q, arg::destination=dest, arg::acquireMode=1); - session.messageFlow(arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);//messages - session.messageFlow(arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);//bytes + ClientSessionFixture(Broker::Options opts = Broker::Options()) : ProxySessionFixture(opts) { + session.queueDeclare(arg::queue="my-queue"); } }; QPID_AUTO_TEST_CASE(testQueueQuery) { ClientSessionFixture fix; fix.session = fix.connection.newSession(); - fix.session.queueDeclare(arg::queue="my-queue", arg::alternateExchange="amq.fanout", arg::exclusive=true, arg::autoDelete=true); - QueueQueryResult result = fix.session.queueQuery(string("my-queue")); + fix.session.queueDeclare(arg::queue="q", arg::alternateExchange="amq.fanout", + arg::exclusive=true, arg::autoDelete=true); + QueueQueryResult result = fix.session.queueQuery("q"); BOOST_CHECK_EQUAL(false, result.getDurable()); BOOST_CHECK_EQUAL(true, result.getExclusive()); - BOOST_CHECK_EQUAL(string("amq.fanout"), - result.getAlternateExchange()); -} - -QPID_AUTO_TEST_CASE(testTransfer) -{ - ClientSessionFixture fix; - fix.session=fix.connection.newSession(); - fix.declareSubscribe(); - fix.session.messageTransfer(arg::acceptMode=1, arg::content=TransferContent("my-message", "my-queue")); - //get & test the message: - FrameSet::shared_ptr msg = fix.session.get(); - BOOST_CHECK(msg->isA()); - BOOST_CHECK_EQUAL(string("my-message"), msg->getContent()); - //confirm receipt: - AckPolicy autoAck; - autoAck.ack(Message(*msg), fix.session); + BOOST_CHECK_EQUAL("amq.fanout", result.getAlternateExchange()); } QPID_AUTO_TEST_CASE(testDispatcher) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 100; for (size_t i = 0; i < count; ++i) fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast(i), "my-queue")); - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); listener.run(); BOOST_CHECK_EQUAL(count, listener.messages.size()); for (size_t i = 0; i < count; ++i) @@ -152,9 +128,8 @@ QPID_AUTO_TEST_CASE(testDispatcherThread) { ClientSessionFixture fix; fix.session =fix.connection.newSession(); - fix.declareSubscribe(); size_t count = 10; - DummyListener listener(fix.session, "my-dest", count); + DummyListener listener(fix.session, "my-queue", count); sys::Thread t(listener); for (size_t i = 0; i < count; ++i) { fix.session.messageTransfer(arg::content=TransferContent(boost::lexical_cast(i), "my-queue")); @@ -190,7 +165,6 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testSuspendResume, 1) { ClientSessionFixture fix; fix.session.timeout(60); - fix.declareSubscribe(); fix.session.suspend(); // Make sure we are still subscribed after resume. fix.connection.resume(fix.session); @@ -234,7 +208,7 @@ QPID_AUTO_TEST_CASE(testLocalQueue) { BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); BOOST_CHECK(lq.empty()); // Credit exhausted. - fix.subs.setFlowControl("lq", FlowControl::unlimited()); + fix.subs.getSubscription("lq").setFlowControl(FlowControl::unlimited()); BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); } -- cgit v1.2.1