diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-28 12:41:23 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-28 12:41:23 +0000 |
| commit | c29ed9b3d6d5bc2f772b7700166fa11d138ae3ec (patch) | |
| tree | 3d1bf0d81859f786a31b68eae7a3cceed2e8b9aa /qpid/cpp/src | |
| parent | aa1a31f789ac361914b3859c0e0b7652955f1caf (diff) | |
| download | qpid-python-c29ed9b3d6d5bc2f772b7700166fa11d138ae3ec.tar.gz | |
QPID-4948: enable browsing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 3 |
4 files changed, 10 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index eb18582b4e..86fe34d8d3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -44,9 +44,10 @@ void Outgoing::wakeup() session.wakeup(); } -OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p) +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, + qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p) : Outgoing(broker, session, source, target, pn_link_name(l)), - Consumer(pn_link_name(l), /*FIXME*/CONSUMER), + Consumer(pn_link_name(l), type), exclusive(e), isControllingUser(p), queue(q), deliveries(5000), link(l), out(o), diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 86d7d46111..f0f2226e10 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingLink class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> { public: - OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser); + OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, + qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index cd1c21c40e..17d7560e75 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -36,6 +36,7 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/broker/Selector.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/amqp/Filter.h" @@ -319,7 +320,8 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (node.queue) { authorise.outgoing(node.queue); - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink())); + SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER; + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink())); q->init(); filter.apply(q); outgoing[link] = q; @@ -354,7 +356,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (!shared) queue->setExclusiveOwner(this); authorise.outgoing(node.exchange, queue, filter); filter.bind(node.exchange, queue); - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false)); q->init(); outgoing[link] = q; } else if (node.relay) { diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 4d37d3169e..2a358a99f7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -300,7 +300,6 @@ AddressHelper::AddressHelper(const Address& address) : if (bind(address, MODE, mode)) { if (mode == BROWSE) { browse = true; - throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0."); } else if (mode != CONSUME) { throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'."); } @@ -560,7 +559,7 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) if (mode == FOR_RECEIVER) { if (timeout) pn_terminus_set_timeout(terminus, timeout); if (browse) { - //when PROTON-139 is resolved, set the required delivery-mode + pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY); } //set filter(s): if (!filters.empty()) { |
