summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-28 12:41:23 +0000
committerGordon Sim <gsim@apache.org>2013-08-28 12:41:23 +0000
commitc29ed9b3d6d5bc2f772b7700166fa11d138ae3ec (patch)
tree3d1bf0d81859f786a31b68eae7a3cceed2e8b9aa /qpid/cpp/src
parentaa1a31f789ac361914b3859c0e0b7652955f1caf (diff)
downloadqpid-python-c29ed9b3d6d5bc2f772b7700166fa11d138ae3ec.tar.gz
QPID-4948: enable browsing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518181 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp3
4 files changed, 10 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index eb18582b4e..86fe34d8d3 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -44,9 +44,10 @@ void Outgoing::wakeup()
session.wakeup();
}
-OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool e, bool p)
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
+ qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
- Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
+ Consumer(pn_link_name(l), type),
exclusive(e),
isControllingUser(p),
queue(q), deliveries(5000), link(l), out(o),
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 86d7d46111..f0f2226e10 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -88,7 +88,8 @@ class Outgoing : public ManagedOutgoingLink
class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool exclusive, bool isControllingUser);
+ OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
+ qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index cd1c21c40e..17d7560e75 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -36,6 +36,7 @@
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
@@ -319,7 +320,8 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (node.queue) {
authorise.outgoing(node.queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false, node.properties.trackControllingLink()));
+ SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -354,7 +356,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared, false));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
q->init();
outgoing[link] = q;
} else if (node.relay) {
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 4d37d3169e..2a358a99f7 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -300,7 +300,6 @@ AddressHelper::AddressHelper(const Address& address) :
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
browse = true;
- throw qpid::messaging::AddressError("Browse mode not yet supported over AMQP 1.0.");
} else if (mode != CONSUME) {
throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'.");
}
@@ -560,7 +559,7 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
if (mode == FOR_RECEIVER) {
if (timeout) pn_terminus_set_timeout(terminus, timeout);
if (browse) {
- //when PROTON-139 is resolved, set the required delivery-mode
+ pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY);
}
//set filter(s):
if (!filters.empty()) {