diff options
| author | Gordon Sim <gsim@apache.org> | 2013-07-18 19:50:09 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-07-18 19:50:09 +0000 |
| commit | 1fd1320082f0cc0aae0c94a7cbd3a0cc69762a71 (patch) | |
| tree | d257505174ed6f18b7ed98bf506478c0be98be46 /qpid/cpp | |
| parent | 3695f380f3d9635e05561f37033e18d6804c69a6 (diff) | |
| download | qpid-python-1fd1320082f0cc0aae0c94a7cbd3a0cc69762a71.tar.gz | |
QPID-5003: set finite lifetime by default for durable subscription queues that are not currently in use
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1504622 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 53 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 1 |
5 files changed, 57 insertions, 20 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 30e4bb8fca..c505217dbb 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -173,6 +173,7 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v return true; } else if (key == AUTO_DELETE_TIMEOUT) { autoDeleteDelay = value; + if (autoDeleteDelay) autodelete = true; return true; } else if (key == QueueFlowLimit::flowStopCountKey) { flowStop.setCount(value); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index e6ea694d54..1d6c835b04 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -333,8 +333,12 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s settings.durable = durable; settings.autodelete = !durable; } + settings.autoDeleteDelay = pn_terminus_get_timeout(source); + if (settings.autoDeleteDelay) { + settings.autodelete = true; + settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; + } filter.configure(settings); - //TODO: populate settings from source details when available from engine std::stringstream queueName; if (shared) { //just use link name (TODO: could allow this to be diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 6d87e1362a..872e1d59df 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -89,6 +89,7 @@ const std::string NODE("node"); const std::string LINK("link"); const std::string MODE("mode"); const std::string RELIABILITY("reliability"); +const std::string TIMEOUT("timeout"); const std::string NAME("name"); const std::string DURABLE("durable"); const std::string X_DECLARE("x-declare"); @@ -517,14 +518,24 @@ Subscription::Subscription(const Address& address, const std::string& type) : Exchange(address), queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())), durable(Opt(address)/LINK/DURABLE), - //if the link is durable, then assume it is also reliable unless expclitly stated otherwise - //if not assume it is unreliable unless expclitly stated otherwise + //if the link is durable, then assume it is also reliable unless explicitly stated otherwise + //if not assume it is unreliable unless explicitly stated otherwise reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)), actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { + const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; + if (timeout) { + if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); + } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) { + //if durable but not explicitly reliable, then set a non-zero + //default for the autodelete timeout (previously this would + //have defaulted to autodelete immediately anyway, so the risk + //of the change causing problems is mitigated) + queueOptions.setInt("qpid.auto_delete_delay", 15*60); + } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); std::string selector = Opt(address)/LINK/SELECTOR; @@ -999,6 +1010,7 @@ Verifier::Verifier() link[NAME] = true; link[DURABLE] = true; link[RELIABILITY] = true; + link[TIMEOUT] = true; link[X_SUBSCRIBE] = true; link[X_DECLARE] = true; link[X_BINDINGS] = true; diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 9ecb46d872..511be7d6fb 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -57,6 +57,7 @@ const std::string PROPERTIES("properties"); const std::string MODE("mode"); const std::string BROWSE("browse"); const std::string CONSUME("consume"); +const std::string TIMEOUT("timeout"); const std::string TYPE("type"); const std::string TOPIC("topic"); @@ -144,6 +145,16 @@ bool test(const Variant::Map& options, const std::string& name) } } +template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue) +{ + Variant::Map::const_iterator j = options.find(name); + if (j == options.end()) { + return defaultValue; + } else { + return j->second; + } +} + bool bind(const Variant::Map& options, const std::string& name, std::string& variable) { Variant::Map::const_iterator j = options.find(name); @@ -260,6 +271,8 @@ void write(pn_data_t* data, const Variant& value) break; } } +const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes +const uint32_t DEFAULT_TIMEOUT(0); } AddressHelper::AddressHelper(const Address& address) : @@ -268,6 +281,7 @@ AddressHelper::AddressHelper(const Address& address) : type(address.getType()), durableNode(false), durableLink(false), + timeout(0), browse(false) { verifier.verify(address); @@ -281,6 +295,7 @@ AddressHelper::AddressHelper(const Address& address) : bind(node, CAPABILITIES, capabilities); durableNode = test(node, DURABLE); durableLink = test(link, DURABLE); + timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); std::string mode; if (bind(address, MODE, mode)) { if (mode == BROWSE) { @@ -521,27 +536,30 @@ void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode) if (durableLink) { pn_terminus_set_durability(terminus, PN_DELIVERIES); } - if (mode == FOR_RECEIVER && browse) { - //when PROTON-139 is resolved, set the required delivery-mode - } - //set filter(s): - if (mode == FOR_RECEIVER && !filters.empty()) { - pn_data_t* filter = pn_terminus_filter(terminus); - pn_data_put_map(filter); - pn_data_enter(filter); - for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { - pn_data_put_symbol(filter, convert(i->name)); - pn_data_put_described(filter); + if (mode == FOR_RECEIVER) { + if (timeout) pn_terminus_set_timeout(terminus, timeout); + if (browse) { + //when PROTON-139 is resolved, set the required delivery-mode + } + //set filter(s): + if (!filters.empty()) { + pn_data_t* filter = pn_terminus_filter(terminus); + pn_data_put_map(filter); pn_data_enter(filter); - if (i->descriptorSymbol.size()) { - pn_data_put_symbol(filter, convert(i->descriptorSymbol)); - } else { - pn_data_put_ulong(filter, i->descriptorCode); + for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { + pn_data_put_symbol(filter, convert(i->name)); + pn_data_put_described(filter); + pn_data_enter(filter); + if (i->descriptorSymbol.size()) { + pn_data_put_symbol(filter, convert(i->descriptorSymbol)); + } else { + pn_data_put_ulong(filter, i->descriptorCode); + } + write(filter, i->value); + pn_data_exit(filter); } - write(filter, i->value); pn_data_exit(filter); } - pn_data_exit(filter); } } @@ -632,6 +650,7 @@ Verifier::Verifier() link[NAME] = true; link[DURABLE] = true; link[RELIABILITY] = true; + link[TIMEOUT] = true; link[X_SUBSCRIBE] = true; link[X_DECLARE] = true; link[X_BINDINGS] = true; diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 4608981a69..581eec5e3b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -68,6 +68,7 @@ class AddressHelper std::string type; bool durableNode; bool durableLink; + uint32_t timeout; bool browse; std::vector<Filter> filters; |
