diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-25 01:55:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-25 01:55:06 +0000 |
| commit | 57bd5193208b228c1088586917d7f43f13e0dd9a (patch) | |
| tree | 564d1aa0d13da985bd2159bbdd8d4b92be4016fb /cpp/src/qpid/client/SubscriptionManager.cpp | |
| parent | d1239516d2cd33ceb90be7a74bd5ea73825c577e (diff) | |
| download | qpid-python-57bd5193208b228c1088586917d7f43f13e0dd9a.tar.gz | |
Client API change: Centralize access to subscription status, better control of acquire/accept.
client/AckPolicy: removed, functionality moved to Subscription and SubscriptionSettings
client/SubscriptionSettings: struct aggregates flow control & accept-acquire parameters for subscribe.
client/Subscription: represents active subscription. Query settings, unacked messages, manual accept/acquire
client/SubscriptionManager: use AcceptMode, AcquireMode enums rather than confusing bools.
Issues addressed by the change:
- old use of bool for acceptMode was inverted wrt AMQP enum values, bools are confusing.
- old AckPolicy was broken - not possible to access the instance associated with an active subscription
- old AckPolicy did not provide a way to do manual acquire, only accept.
- setting values on SubscriptionManager to apply to subsequent subscriptions is awkward & error-prone, now can use SubscriptionSettings to control on each subscribe individually.
- a subscription is a central concept in AMQP, it deserves to be a class. Subscription and SubscriptionSettings provides a single point for future expansion of interactions with a a Subscription.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707808 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
| -rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 94 |
1 files changed, 29 insertions, 65 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index dde93635c8..7e2f2f8595 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -22,6 +22,7 @@ #define _Subscription_ #include "SubscriptionManager.h" +#include "SubscriptionImpl.h" #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> @@ -34,83 +35,41 @@ namespace qpid { namespace client { SubscriptionManager::SubscriptionManager(const Session& s) - : dispatcher(s), session(s), - flowControl(UNLIMITED, UNLIMITED, false), - acceptMode(0), acquireMode(0), - autoStop(true) + : dispatcher(s), session(s), autoStop(true) {} -void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest, const FlowControl& fc) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - session.messageSubscribe( - arg::queue=q, arg::destination=dest, - arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - if (fc.messages || fc.bytes) // No need to set if all 0. - setFlowControl(dest, fc); + std::string name=n.empty() ? q:n; + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); + dispatcher.listen(si); + return subscriptions[name] = Subscription(si.get()); } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { - subscribe(listener, q, getFlowControl(), d); + std::string name=n.empty() ? q:n; + lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); + boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); + lq.subscription = Subscription(si.get()); + return subscriptions[name] = lq.subscription; } -void SubscriptionManager::subscribe( - MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +Subscription SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const std::string& n) { - std::string dest=d.empty() ? q:d; - dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest, fc); + return subscribe(listener, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const std::string& d) +Subscription SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const std::string& n) { - subscribe(lq, q, getFlowControl(), d); + return subscribe(lq, q, defaultSettings, n); } -void SubscriptionManager::subscribe( - LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) -{ - std::string dest=d.empty() ? q:d; - lq.session=session; - lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest, fc); -} - -void SubscriptionManager::setFlowControl( - const std::string& dest, uint32_t messages, uint32_t bytes, bool window) -{ - session.messageSetFlowMode(dest, window); - session.messageFlow(dest, 0, messages); - session.messageFlow(dest, 1, bytes); - session.sync(); -} - -void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { - setFlowControl(dest, fc.messages, fc.bytes, fc.window); -} - -void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } - -void SubscriptionManager::setFlowControl( - uint32_t messages_, uint32_t bytes_, bool window_) -{ - setFlowControl(FlowControl(messages_, bytes_, window_)); -} - -const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } - -void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } - -void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } - -void SubscriptionManager::setAckPolicy(const AckPolicy& a) { autoAck=a; } - -AckPolicy& SubscriptionManager::getAckPolicy() { return autoAck; } - -void SubscriptionManager::cancel(const std::string dest) +void SubscriptionManager::cancel(const std::string& dest) { sync(session).messageCancel(dest); dispatcher.cancel(dest); @@ -138,10 +97,11 @@ void SubscriptionManager::stop() bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { LocalQueue lq; std::string unique = framing::Uuid(true).str(); - subscribe(lq, queue, FlowControl::messageCredit(1), unique); + subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); AutoCancel ac(*this, unique); //first wait for message to be delivered if a timeout has been specified - if (timeout && lq.get(result, timeout)) return true; + if (timeout && lq.get(result, timeout)) + return true; //make sure message is not on queue before final check sync(session).messageFlush(unique); return lq.get(result, 0); @@ -149,6 +109,10 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du Session SubscriptionManager::getSession() const { return session; } +Subscription SubscriptionManager::getSubscription(const std::string& name) const { + return subscriptions.at(name); +} + void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) { dispatcher.registerFailoverHandler(fh); } |
