summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-07-18 19:50:09 +0000
committerGordon Sim <gsim@apache.org>2013-07-18 19:50:09 +0000
commit1fd1320082f0cc0aae0c94a7cbd3a0cc69762a71 (patch)
treed257505174ed6f18b7ed98bf506478c0be98be46 /qpid/cpp
parent3695f380f3d9635e05561f37033e18d6804c69a6 (diff)
downloadqpid-python-1fd1320082f0cc0aae0c94a7cbd3a0cc69762a71.tar.gz
QPID-5003: set finite lifetime by default for durable subscription queues that are not currently in use
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504622 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp6
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp16
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp53
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h1
5 files changed, 57 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index 30e4bb8fca..c505217dbb 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -173,6 +173,7 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
return true;
} else if (key == AUTO_DELETE_TIMEOUT) {
autoDeleteDelay = value;
+ if (autoDeleteDelay) autodelete = true;
return true;
} else if (key == QueueFlowLimit::flowStopCountKey) {
flowStop.setCount(value);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index e6ea694d54..1d6c835b04 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -333,8 +333,12 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
settings.durable = durable;
settings.autodelete = !durable;
}
+ settings.autoDeleteDelay = pn_terminus_get_timeout(source);
+ if (settings.autoDeleteDelay) {
+ settings.autodelete = true;
+ settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
+ }
filter.configure(settings);
- //TODO: populate settings from source details when available from engine
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index 6d87e1362a..872e1d59df 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -89,6 +89,7 @@ const std::string NODE("node");
const std::string LINK("link");
const std::string MODE("mode");
const std::string RELIABILITY("reliability");
+const std::string TIMEOUT("timeout");
const std::string NAME("name");
const std::string DURABLE("durable");
const std::string X_DECLARE("x-declare");
@@ -517,14 +518,24 @@ Subscription::Subscription(const Address& address, const std::string& type)
: Exchange(address),
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
durable(Opt(address)/LINK/DURABLE),
- //if the link is durable, then assume it is also reliable unless expclitly stated otherwise
- //if not assume it is unreliable unless expclitly stated otherwise
+ //if the link is durable, then assume it is also reliable unless explicitly stated otherwise
+ //if not assume it is unreliable unless explicitly stated otherwise
reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)),
actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
{
+ const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
+ if (timeout) {
+ if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
+ } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
+ //if durable but not explicitly reliable, then set a non-zero
+ //default for the autodelete timeout (previously this would
+ //have defaulted to autodelete immediately anyway, so the risk
+ //of the change causing problems is mitigated)
+ queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+ }
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
std::string selector = Opt(address)/LINK/SELECTOR;
@@ -999,6 +1010,7 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 9ecb46d872..511be7d6fb 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -57,6 +57,7 @@ const std::string PROPERTIES("properties");
const std::string MODE("mode");
const std::string BROWSE("browse");
const std::string CONSUME("consume");
+const std::string TIMEOUT("timeout");
const std::string TYPE("type");
const std::string TOPIC("topic");
@@ -144,6 +145,16 @@ bool test(const Variant::Map& options, const std::string& name)
}
}
+template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return defaultValue;
+ } else {
+ return j->second;
+ }
+}
+
bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
{
Variant::Map::const_iterator j = options.find(name);
@@ -260,6 +271,8 @@ void write(pn_data_t* data, const Variant& value)
break;
}
}
+const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_TIMEOUT(0);
}
AddressHelper::AddressHelper(const Address& address) :
@@ -268,6 +281,7 @@ AddressHelper::AddressHelper(const Address& address) :
type(address.getType()),
durableNode(false),
durableLink(false),
+ timeout(0),
browse(false)
{
verifier.verify(address);
@@ -281,6 +295,7 @@ AddressHelper::AddressHelper(const Address& address) :
bind(node, CAPABILITIES, capabilities);
durableNode = test(node, DURABLE);
durableLink = test(link, DURABLE);
+ timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
std::string mode;
if (bind(address, MODE, mode)) {
if (mode == BROWSE) {
@@ -521,27 +536,30 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
if (durableLink) {
pn_terminus_set_durability(terminus, PN_DELIVERIES);
}
- if (mode == FOR_RECEIVER && browse) {
- //when PROTON-139 is resolved, set the required delivery-mode
- }
- //set filter(s):
- if (mode == FOR_RECEIVER && !filters.empty()) {
- pn_data_t* filter = pn_terminus_filter(terminus);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
- pn_data_put_symbol(filter, convert(i->name));
- pn_data_put_described(filter);
+ if (mode == FOR_RECEIVER) {
+ if (timeout) pn_terminus_set_timeout(terminus, timeout);
+ if (browse) {
+ //when PROTON-139 is resolved, set the required delivery-mode
+ }
+ //set filter(s):
+ if (!filters.empty()) {
+ pn_data_t* filter = pn_terminus_filter(terminus);
+ pn_data_put_map(filter);
pn_data_enter(filter);
- if (i->descriptorSymbol.size()) {
- pn_data_put_symbol(filter, convert(i->descriptorSymbol));
- } else {
- pn_data_put_ulong(filter, i->descriptorCode);
+ for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) {
+ pn_data_put_symbol(filter, convert(i->name));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ if (i->descriptorSymbol.size()) {
+ pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+ } else {
+ pn_data_put_ulong(filter, i->descriptorCode);
+ }
+ write(filter, i->value);
+ pn_data_exit(filter);
}
- write(filter, i->value);
pn_data_exit(filter);
}
- pn_data_exit(filter);
}
}
@@ -632,6 +650,7 @@ Verifier::Verifier()
link[NAME] = true;
link[DURABLE] = true;
link[RELIABILITY] = true;
+ link[TIMEOUT] = true;
link[X_SUBSCRIBE] = true;
link[X_DECLARE] = true;
link[X_BINDINGS] = true;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index 4608981a69..581eec5e3b 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -68,6 +68,7 @@ class AddressHelper
std::string type;
bool durableNode;
bool durableLink;
+ uint32_t timeout;
bool browse;
std::vector<Filter> filters;