summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-06-13 16:37:10 +0000
committerGordon Sim <gsim@apache.org>2011-06-13 16:37:10 +0000
commit724410ead77e3cb8461ab61ab66b46754129edc9 (patch)
tree8f949ef342f64778fd4c0ce1984d231e8066091d /cpp/src/qpid
parent7674a415e10a512b819d6f616fdb5321b68d8982 (diff)
downloadqpid-python-724410ead77e3cb8461ab61ab66b46754129edc9.tar.gz
QPID-3225: Allow exclusivity (for subscription queue and subscription itself) to be controlled through address properties.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1135165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp21
1 files changed, 16 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f1295a3b66..9cf5f31290 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -233,6 +233,8 @@ class Subscription : public Exchange, public MessageSource
const bool reliable;
const bool durable;
const std::string actualType;
+ const bool exclusiveQueue;
+ const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -307,6 +309,7 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
+ bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -338,6 +341,12 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
+bool Opt::asBool(bool defaultValue) const
+{
+ if (value) return value->asBool();
+ else return defaultValue;
+}
+
std::string Opt::str() const
{
if (value) return value->asString();
@@ -490,7 +499,9 @@ Subscription::Subscription(const Address& address, const std::string& type)
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+ exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- session.queueDelete(arg::queue=queue);
+ if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
checkDelete(session, FOR_RECEIVER);
}