From 28094213ff2f572d21b272853fd9063903564a7a Mon Sep 17 00:00:00 2001 From: Andrew Donald Kennedy Date: Thu, 22 Jul 2010 13:09:56 +0000 Subject: QPID-2682: Move slow consumer disconnection mechanism to the broker git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@966637 13f79535-47bb-0310-9956-ffa450edef68 --- .../experimental/slowconsumerdetection/MANIFEST.MF | 30 -- .../experimental/slowconsumerdetection/build.xml | 34 -- .../plugin/SlowConsumerDetectionConfiguration.java | 92 ------ .../SlowConsumerDetectionPolicyConfiguration.java | 76 ----- .../SlowConsumerDetectionQueueConfiguration.java | 153 --------- .../qpid/server/virtualhost/plugin/Activator.java | 62 ---- .../plugin/ConfiguredQueueBindingListener.java | 86 ----- .../virtualhost/plugin/SlowConsumerDetection.java | 161 ---------- .../SlowConsumerDetection_logmessages.properties | 4 - .../TopicDeletePolicy_logmessages.properties | 3 - .../plugin/policies/TopicDeletePolicy.java | 141 --------- .../policies/TopicDeletePolicyConfiguration.java | 81 ----- .../policies/SlowConsumerPolicyPlugin.java | 29 -- .../policies/SlowConsumerPolicyPluginFactory.java | 27 -- .../SlowConsumerDetectionConfigurationTest.java | 346 --------------------- ...owConsumerDetectionPolicyConfigurationTest.java | 104 ------- ...lowConsumerDetectionQueueConfigurationTest.java | 187 ----------- .../TopicDeletePolicyConfigurationTest.java | 88 ------ .../plugin/policies/TopicDeletePolicyTest.java | 293 ----------------- .../org/apache/qpid/systest/GlobalQueuesTest.java | 222 ------------- .../org/apache/qpid/systest/GlobalTopicsTest.java | 36 --- .../qpid/systest/MergeConfigurationTest.java | 124 -------- .../org/apache/qpid/systest/SubscriptionTest.java | 146 --------- .../org/apache/qpid/systest/TestingBaseCase.java | 255 --------------- .../java/org/apache/qpid/systest/TopicTest.java | 85 ----- .../plugin/SlowConsumerDetectionConfiguration.java | 92 ++++++ .../SlowConsumerDetectionPolicyConfiguration.java | 76 +++++ .../SlowConsumerDetectionQueueConfiguration.java | 153 +++++++++ .../apache/qpid/server/plugins/PluginManager.java | 90 +++--- .../plugin/ConfiguredQueueBindingListener.java | 86 +++++ .../virtualhost/plugin/SlowConsumerDetection.java | 161 ++++++++++ .../SlowConsumerDetection_logmessages.properties | 4 + .../TopicDeletePolicy_logmessages.properties | 3 + .../plugin/policies/TopicDeletePolicy.java | 141 +++++++++ .../policies/TopicDeletePolicyConfiguration.java | 81 +++++ .../policies/SlowConsumerPolicyPlugin.java | 29 ++ .../policies/SlowConsumerPolicyPluginFactory.java | 27 ++ .../SlowConsumerDetectionConfigurationTest.java | 346 +++++++++++++++++++++ ...owConsumerDetectionPolicyConfigurationTest.java | 104 +++++++ ...lowConsumerDetectionQueueConfigurationTest.java | 185 +++++++++++ .../TopicDeletePolicyConfigurationTest.java | 88 ++++++ .../plugin/policies/TopicDeletePolicyTest.java | 293 +++++++++++++++++ .../org/apache/qpid/systest/GlobalQueuesTest.java | 222 +++++++++++++ .../org/apache/qpid/systest/GlobalTopicsTest.java | 36 +++ .../qpid/systest/MergeConfigurationTest.java | 124 ++++++++ .../org/apache/qpid/systest/SubscriptionTest.java | 146 +++++++++ .../org/apache/qpid/systest/TestingBaseCase.java | 255 +++++++++++++++ .../java/org/apache/qpid/systest/TopicTest.java | 85 +++++ 48 files changed, 2787 insertions(+), 2905 deletions(-) delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java delete mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java create mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF deleted file mode 100644 index 3d3d91381b..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF +++ /dev/null @@ -1,30 +0,0 @@ -Manifest-Version: 1.0 -Bundle-ManifestVersion: 2 -Bundle-Name: Qpid Slow Consumer Detection -Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true -Bundle-Version: 1.0.0 -Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator -Import-Package: org.osgi.framework, - org.apache.qpid.server.configuration.plugins, - org.apache.qpid.server.configuration, - org.apache.qpid.server.virtualhost.plugins, - org.apache.qpid.server.virtualhost, - org.apache.qpid.server.queue, - org.apache.qpid.server.binding, - org.apache.qpid.server.exchange, - org.apache.qpid.server.registry, - org.apache.qpid.server.plugins, - org.apache.qpid.server.protocol, - org.apache.qpid.server.logging, - org.apache.qpid.server.logging.actors, - org.apache.qpid.protocol, - org.apache.qpid.framing, - org.apache.qpid, - org.apache.log4j, - org.apache.commons.configuration -Bundle-RequiredExecutionEnvironment: JavaSE-1.6 -Bundle-ClassPath: . -Bundle-ActivationPolicy: lazy -Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework", - org.apache.qpid.server.virtualhost.plugin.policies - diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml deleted file mode 100644 index 06ebc58030..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - - - - - - - - - - - - diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java deleted file mode 100644 index dd63c9b698..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.ConversionException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin -{ - public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List getParentPaths() - { - return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection"); - } - } - - //Set Default time unit to seconds - TimeUnit _timeUnit = TimeUnit.SECONDS; - - public String[] getElementsProcessed() - { - return new String[]{"delay", - "timeunit"}; - } - - public long getDelay() - { - return getLongValue("delay", 10); - } - - public TimeUnit getTimeUnit() - { - return _timeUnit; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - validatePositiveLong("delay"); - - String timeUnit = getStringValue("timeunit"); - - if (timeUnit != null) - { - try - { - _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase()); - } - catch (IllegalArgumentException iae) - { - throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit); - } - } - - System.out.println("Configured SCDC"); - System.out.println("Delay:" + getDelay()); - System.out.println("TimeUnit:" + getTimeUnit()); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java deleted file mode 100644 index 8e2ecff6fb..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.Arrays; -import java.util.List; - -public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin -{ - public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"name"}; - } - - public String getPolicyName() - { - return getStringValue("name"); - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - if (getPolicyName() == null) - { - throw new ConfigurationException("No Slow consumer policy defined."); - } - } - - @Override - public String formatToString() - { - return "Policy:"+getPolicyName(); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java deleted file mode 100644 index e825556e61..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin -{ - private SlowConsumerPolicyPlugin _policyPlugin; - - public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", - "virtualhosts.virtualhost.topics.slow-consumer-detection", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"messageAge", - "depth", - "messageCount"}; - } - - public long getMessageAge() - { - return getLongValue("messageAge"); - } - - public long getDepth() - { - return getLongValue("depth"); - } - - public long getMessageCount() - { - return getLongValue("messageCount"); - } - - public SlowConsumerPolicyPlugin getPolicy() - { - return _policyPlugin; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - if (!containsPositiveLong("messageAge") && - !containsPositiveLong("depth") && - !containsPositiveLong("messageCount")) - { - throw new ConfigurationException("At least one configuration property" + - "('messageAge','depth' or 'messageCount') must be specified."); - } - - SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); - - PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); - Map factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); - - if (policyConfig == null) - { - throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet()); - } - - if (_logger.isDebugEnabled()) - { - Iterator keys = policyConfig.getConfig().getKeys(); - - while (keys.hasNext()) - { - String key = (String) keys.next(); - - _logger.debug("Policy Keys:" + key); - } - - } - - SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); - - if (pluginFactory == null) - { - throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); - } - - _policyPlugin = pluginFactory.newInstance(policyConfig); - - // Debug the creation of this Config - _logger.debug(this); - } - - public String formatToString() - { - StringBuilder sb = new StringBuilder(); - if (getMessageAge() > 0) - { - sb.append("Age:").append(getMessageAge()).append(":"); - } - if (getDepth() > 0) - { - sb.append("Depth:").append(getDepth()).append(":"); - } - if (getMessageCount() > 0) - { - sb.append("Count:").append(getMessageCount()).append(":"); - } - - sb.append("Policy[").append(getPolicy()).append("]"); - return sb.toString(); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java deleted file mode 100644 index 7b0168d436..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy; -import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicyConfiguration; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; - -/** - * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin. - * - * This includes Configuration - * - * @author ritchiem - */ -public class Activator implements BundleActivator -{ - public void start(BundleContext ctx) throws Exception - { - if (null != ctx) - { - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null); - ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null); - - ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.TopicDeletePolicyFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new TopicDeletePolicyConfiguration.TopicDeletePolicyConfigurationFactory(), null); - } - } - - public void stop(BundleContext ctx) throws Exception - { - // no need to do anything here, osgi will unregister the service for us - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java deleted file mode 100644 index d947e9a367..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.apache.qpid.server.virtualhost.plugin; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.Exchange.BindingListener; -import org.apache.qpid.server.queue.AMQQueue; - -/** - * This is a listener that caches queues that are configured for slow consumer disconnection. - * - * There should be one listener per virtual host, which can be added to all exchanges on - * that host. - * - * TODO In future, it will be possible to configure the policy at runtime, so only the queue - * itself is cached, and the configuration looked up by the housekeeping thread. This means - * that there may be occasions where the copy of the cache contents retrieved by the thread - * does not contain queues that are configured, or that configured queues are not present. - * - * @see BindingListener - */ -public class ConfiguredQueueBindingListener implements BindingListener -{ - private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); - - private String _vhostName; - private Set _cache = Collections.synchronizedSet(new HashSet()); - - public ConfiguredQueueBindingListener(String vhostName) - { - _vhostName = vhostName; - } - - /** - * @see BindingListener#bindingAdded(Exchange, Binding) - */ - public void bindingAdded(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - /** - * @see BindingListener#bindingRemoved(Exchange, Binding) - */ - public void bindingRemoved(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - private void processBinding(Binding binding) - { - AMQQueue queue = binding.getQueue(); - - SlowConsumerDetectionQueueConfiguration config = - queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (config != null) - { - _cache.add(queue); - } - else - { - _cache.remove(queue); - } - } - - /** - * Lookup and return the cache of configured {@link AMQQueue}s. - * - * Note that when accessing the cached queues, the {@link Iterator} is not thread safe - * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the - * cache is returned. - * - * @return a copy of the cached {@link java.util.Set} of queues - */ - public Set getQueueCache() - { - return new HashSet(_cache); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java deleted file mode 100644 index 7de95bbfa7..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; - -class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin -{ - private SlowConsumerDetectionConfiguration _config; - private ConfiguredQueueBindingListener _listener; - - public static class SlowConsumerFactory implements VirtualHostPluginFactory - { - public SlowConsumerDetection newInstance(VirtualHost vhost) - { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); - - if (config == null) - { - return null; - } - - SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); - plugin.configure(config); - return plugin; - } - } - - /** - * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping - * thread. - * - * @see Plugin#configure(ConfigurationPlugin) - */ - public void configure(ConfigurationPlugin config) - { - _config = (SlowConsumerDetectionConfiguration) config; - _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); - for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) - { - _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); - } - } - - public SlowConsumerDetection(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); - - Set cache = _listener.getQueueCache(); - for (AMQQueue q : cache) - { - CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); - - try - { - SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (checkQueueStatus(q, config)) - { - config.getPolicy().performPolicy(q); - } - } - catch (Exception e) - { - // Don't throw exceptions as this will stop the house keeping task from running. - _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); - } - } - - CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); - } - - public long getDelay() - { - return _config.getDelay(); - } - - public TimeUnit getTimeUnit() - { - return _config.getTimeUnit(); - } - - /** - * Check the depth,messageSize,messageAge,messageCount values for this q - * - * @param q the queue to check - * @param config the queue configuration to compare against the queue state - * - * @return true if the queue has reached a threshold. - */ - private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) - { - if (config != null) - { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - - int count = q.getMessageCount(); - - // First Check message counts - if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || - // The check queue depth - (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || - // finally if we have messages on the queue check Arrival time. - // We must check count as OldestArrival time is Long.MAX_LONG when - // there are no messages. - (config.getMessageAge() != 0 && - ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); - _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); - _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); - _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); - } - - return true; - } - } - return false; - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties deleted file mode 100644 index 2714935a71..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties +++ /dev/null @@ -1,4 +0,0 @@ -#SlowConsumerDetection.logMessages -RUNNING = SCD-1001 : Running -COMPLETE = SCD-1002 : Complete -CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0} \ No newline at end of file diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties deleted file mode 100644 index d0f5965c39..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties +++ /dev/null @@ -1,3 +0,0 @@ -#TopicDeletePolicy.logMessages -DELETING_QUEUE = TDP-1001 : Deleting Queue -DISCONNECTING = TDP-1002 : Disconnecting Session \ No newline at end of file diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java deleted file mode 100644 index 3bd4ae8d4e..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.plugin.logging.TopicDeletePolicyMessages; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -public class TopicDeletePolicy implements SlowConsumerPolicyPlugin -{ - Logger _logger = Logger.getLogger(TopicDeletePolicy.class); - private TopicDeletePolicyConfiguration _configuration; - - public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory - { - public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException - { - TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(config); - return policy; - } - - public String getPluginName() - { - return "topicdelete"; - } - - public Class getPluginClass() - { - return TopicDeletePolicy.class; - } - } - - public void performPolicy(AMQQueue q) - { - if (q == null) - { - return; - } - - AMQSessionModel owner = q.getExclusiveOwningSession(); - - // Only process exclusive queues - if (owner == null) - { - return; - } - - //Only process Topics - if (!validateQueueIsATopic(q)) - { - return; - } - - try - { - CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); - // Close the consumer . this will cause autoDelete Queues to be purged - owner.getConnectionModel(). - closeSession(owner, AMQConstant.RESOURCE_ERROR, - "Consuming to slow."); - - // Actively delete non autoDelete queues if deletePersistent is set - if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) - { - CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); - q.delete(); - } - - } - catch (AMQException e) - { - _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); - } - - } - - /** - * Check the queue bindings to validate the queue is bound to the - * topic exchange. - * - * @param q the Queue - * - * @return true iff Q is bound to a TopicExchange - */ - private boolean validateQueueIsATopic(AMQQueue q) - { - for (Binding binding : q.getBindings()) - { - if (binding.getExchange() instanceof TopicExchange) - { - return true; - } - } - - return false; - } - - public void configure(ConfigurationPlugin config) - { - _configuration = (TopicDeletePolicyConfiguration) config; - } - - @Override - public String toString() - { - return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java deleted file mode 100644 index e6ad1cbcc3..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -public class TopicDeletePolicyConfiguration extends ConfigurationPlugin -{ - - public static class TopicDeletePolicyConfigurationFactory - implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, - Configuration config) - throws ConfigurationException - { - TopicDeletePolicyConfiguration slowConsumerConfig = - new TopicDeletePolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"delete-persistent"}; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - // No validation required. - } - - public boolean deletePersistent() - { - // If we don't have configuration then we don't deletePersistent Queues - return (hasConfiguration() && contains("delete-persistent")); - } - - @Override - public String formatToString() - { - return (deletePersistent()?"delete-durable":""); - } - - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java deleted file mode 100644 index 7f600abdc9..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.slowconsumerdetection.policies; - -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.queue.AMQQueue; - -public interface SlowConsumerPolicyPlugin extends Plugin -{ - public void performPolicy(AMQQueue Queue); -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java deleted file mode 100644 index b2fe6766a6..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.slowconsumerdetection.policies; - -import org.apache.qpid.server.plugins.PluginFactory; - -public interface SlowConsumerPolicyPluginFactory

extends PluginFactory

-{ -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java deleted file mode 100644 index 40dc382d30..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -import java.util.concurrent.TimeUnit; - -/** - * Provide Unit Test coverage of the virtualhost SlowConsumer Configuration - * This is what controls how often the plugin will execute - */ -public class SlowConsumerDetectionConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Default Testing: - * - * Provide a fully complete and valid configuration specifying 'delay' and - * 'timeunit' and ensure that it is correctly processed. - * - * Ensure no exceptions are thrown and that we get the same values back that - * were put into the configuration. - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - String TIMEUNIT=TimeUnit.MICROSECONDS.toString(); - xmlconfig.addProperty("delay", String.valueOf(DELAY)); - xmlconfig.addProperty("timeunit", TIMEUNIT); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("TimeUnit not correctly returned.", - TIMEUNIT, String.valueOf(config.getTimeUnit())); - } - - /** - * Default Testing: - * - * Test Missing TimeUnit value gets default. - * - * The TimeUnit value is optional and default to SECONDS. - * - * Test that if we do not specify a TimeUnit then we correctly get seconds. - * - * Also verify that relying on the default does not impact the setting of - * the 'delay' value. - * - */ - public void testConfigLoadingMissingTimeUnitDefaults() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - xmlconfig.addProperty("delay", String.valueOf(DELAY)); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("Default TimeUnit incorrect", TimeUnit.SECONDS, config.getTimeUnit()); - } - - /** - * Input Testing: - * - * TimeUnit parsing requires the String value be in UpperCase. - * Ensure we can handle when the user doesn't know this. - * - * Same test as 'testConfigLoadingValidConfig' but checking that - * the timeunit field is not case sensitive. - * i.e. the toUpper is being correctly applied. - */ - public void testConfigLoadingValidConfigStrangeTimeUnit() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - - xmlconfig.addProperty("delay", DELAY); - xmlconfig.addProperty("timeunit", "MiCrOsEcOnDs"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("TimeUnit not correctly returned.", - TimeUnit.MICROSECONDS.toString(), String.valueOf(config.getTimeUnit())); - - } - - /** - * Failure Testing: - * - * Test that delay must be long not a string value. - * Provide a delay as a written value not a long. 'ten'. - * - * This should throw a configuration exception which is being trapped and - * verified to be the right exception, a NumberFormatException. - * - */ - public void testConfigLoadingInValidDelayString() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "ten"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertEquals("Cause not correct", NumberFormatException.class, cause.getClass()); - } - } - - /** - * Failure Testing: - * - * Test that negative delays are invalid. - * - * Delay must be a positive value as negative delay means doesn't make sense. - * - * Configuration exception with a useful message should be thrown here. - * - */ - public void testConfigLoadingInValidDelayNegative() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "-10"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertNotNull("Configuration Exception must not be null.", cause); - assertEquals("Cause not correct", - ConfigurationException.class, cause.getClass()); - assertEquals("Incorrect message.", - "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", - cause.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that delay cannot be 0. - * - * A zero delay means run constantly. This is not how VirtualHostTasks - * are designed to be run so we dis-allow the use of 0 delay. - * - * Same test as 'testConfigLoadingInValidDelayNegative' but with a 0 value. - * - */ - public void testConfigLoadingInValidDelayZero() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "0"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertNotNull("Configuration Exception must not be null.", cause); - assertEquals("Cause not correct", - ConfigurationException.class, cause.getClass()); - assertEquals("Incorrect message.", - "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", - cause.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that missing delay fails. - * If we have no delay then we do not pick a default. So a Configuration - * Exception is thrown. - * - * */ - public void testConfigLoadingInValidMissingDelay() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("timeunit", TimeUnit.SECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - assertEquals("Incorrect message.", "SlowConsumerDetectionConfiguration: unable to configure invalid delay:null", e.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that erroneous TimeUnit fails. - * - * Valid TimeUnit values vary based on the JVM version i.e. 1.6 added HOURS/DAYS etc. - * - * We don't test the values for TimeUnit are accepted other than MILLISECONDS in the - * positive testing at the start. - * - * Here we ensure that an erroneous for TimeUnit correctly throws an exception. - * - * We test with 'foo', which will never be a TimeUnit - * - */ - public void testConfigLoadingInValidTimeUnit() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - String TIMEUNIT = "foo"; - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "10"); - xmlconfig.addProperty("timeunit", TIMEUNIT); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - assertEquals("Incorrect message.", "Unable to configure Slow Consumer Detection invalid TimeUnit:" + TIMEUNIT, e.getMessage()); - } - } - - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java deleted file mode 100644 index 67c177f099..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Test class to ensure that the policy configuration can be processed. - */ -public class SlowConsumerDetectionPolicyConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Input Testing: - * - * Test that a given String can be set and retrieved through the configuration - * - * No validation is being performed to ensure that the policy exists. Only - * that a value can be set for the policy. - * - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - String policyName = "TestPolicy"; - xmlconfig.addProperty("name", policyName); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Policy name not retrieved as expected.", - policyName, config.getPolicyName()); - } - - /** - * Failure Testing: - * - * Test that providing a configuration section without the 'name' field - * causes an exception to be thrown. - * - * An empty configuration is provided and the thrown exception message - * is checked to confirm the right reason. - * - */ - public void testConfigLoadingInValidConfig() - { - SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Config is invalid so won't validate."); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - assertEquals("Exception message not as expected.", "No Slow consumer policy defined.", e.getMessage()); - } - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java deleted file mode 100644 index 57e3233eeb..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Unit test the QueueConfiguration processing. - * - * This is slightly awkward as the SCDQC requries that a policy be available. - * - * So all the Valid test much catch the ensuing ConfigurationException and - * validate that the error is due to a lack of a valid Policy - */ -public class SlowConsumerDetectionQueueConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Test a fully loaded configuration file. - * - * It is not an error to have all control values specified. - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("messageAge", "60000"); - xmlconfig.addProperty("depth", "1024"); - xmlconfig.addProperty("messageCount", "10"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * When we do not specify any control value then a ConfigurationException - * must be thrown to remind us. - */ - public void testConfigLoadingMissingConfig() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - - assertEquals("At least one configuration property('messageAge','depth'" + - " or 'messageCount') must be specified.", e.getMessage()); - } - } - - /** - * Setting messageAge on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingMessageAgeOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("messageAge", "60000"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * Setting depth on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingDepthOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("depth", "1024"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * Setting messageCount on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingMessageCountOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("messageCount", "10"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java deleted file mode 100644 index 8b729a0f43..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Test to ensure TopicDelete Policy configuration can be loaded. - */ -public class TopicDeletePolicyConfigurationTest extends InternalBrokerBaseCase -{ - /** - * Test without any configuration being provided that the - * deletePersistent option is disabled. - */ - public void testNoConfigNoDeletePersistent() - { - TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); - - assertFalse("TopicDelete Configuration with no config should not delete persistent queues.", - config.deletePersistent()); - } - - /** - * Test that with the correct configuration the deletePersistent option can - * be enabled. - * - * Test creates a new Configuration object and passes in the xml snippet - * that the ConfigurationPlugin would receive during normal execution. - * This is the XML that would be matched for this plugin: - * - * - * - * - * So it would be subset and passed in as just: - * - * - * - * The property should therefore be enabled. - * - */ - public void testConfigDeletePersistent() - { - TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delete-persistent",""); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("",composite); - } - catch (ConfigurationException e) - { - fail(e.getMessage()); - } - - assertTrue("A configured TopicDelete should delete persistent queues.", - config.deletePersistent()); - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java deleted file mode 100644 index 364766dfa7..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class TopicDeletePolicyTest extends InternalBrokerBaseCase -{ - - TopicDeletePolicyConfiguration _config; - - VirtualHost _defaultVhost; - InternalTestProtocolSession _connection; - - public void setUp() throws Exception - { - super.setUp(); - - _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost(); - - _connection = new InternalTestProtocolSession(_defaultVhost); - - _config = new TopicDeletePolicyConfiguration(); - - XMLConfiguration config = new XMLConfiguration(); - - _config.setConfiguration("", config); - } - - private MockAMQQueue createOwnedQueue() - { - MockAMQQueue queue = new MockAMQQueue("testQueue"); - - _defaultVhost.getQueueRegistry().registerQueue(queue); - - try - { - AMQChannel channel = new AMQChannel(_connection, 0, null); - _connection.addChannel(channel); - - queue.setExclusiveOwningSession(channel); - } - catch (AMQException e) - { - fail("Unable to create Channel:" + e.getMessage()); - } - - return queue; - } - - private void setQueueToAutoDelete(final AMQQueue queue) - { - ((MockAMQQueue) queue).setAutoDelete(true); - - queue.setDeleteOnNoConsumers(true); - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) throws AMQException - { - queue.delete(); - } - }; - - ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask); - } - - /** Check that a null queue passed in does not upset the policy. */ - public void testNullQueueParameter() throws ConfigurationException - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - try - { - policy.performPolicy(null); - } - catch (Exception e) - { - fail("Exception should not be thrown:" + e.getMessage()); - } - - } - - /** - * Set a owning Session to null which means this is not an exclusive queue - * so the queue should not be deleted - */ - public void testNonExclusiveQueue() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.setExclusiveOwningSession(null); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertFalse("Connection should not be closed", _connection.isClosed()); - } - - /** - * Test that exclusive JMS Queues are not deleted. - * Bind the queue to the direct exchange (so it is a JMS Queue). - * - * JMS Queues are not to be processed so this should not delete the queue. - */ - public void testQueuesAreNotProcessed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertFalse("Connection should not be closed", _connection.isClosed()); - } - - /** - * Give a non auto-delete queue is bound to the topic exchange the - * TopicDeletePolicy will close the connection and delete the queue, - */ - public void testNonAutoDeleteTopicIsNotClosed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - queue.setAutoDelete(false); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will - * close the connection and delete the queue - */ - public void testTopicIsClosed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - final MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - setQueueToAutoDelete(queue); - - policy.performPolicy(queue); - - assertTrue("Queue should be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy will - * close the connection and NOT delete the queue - */ - public void testNonAutoDeleteTopicIsClosedNotDeleted() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy suitably - * configured with the delete-persistent tag will close the connection - * and delete the queue - */ - public void testPersistentTopicIsClosedAndDeleted() - { - //Set the config to delete persistent queues - _config.getConfig().addProperty("delete-persistent", ""); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - assertTrue("Config was not updated to delete Persistent topics", - _config.deletePersistent()); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertTrue("Queue should be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy not - * configured to close a persistent queue - */ - public void testPersistentTopicIsClosedAndDeletedNullConfig() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - // Explicity say we are not configuring the policy. - policy.configure(null); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - public void testNonExclusiveQueueNullConfig() - { - _config = null; - testNonExclusiveQueue(); - } - - public void testQueuesAreNotProcessedNullConfig() - { - _config = null; - testQueuesAreNotProcessed(); - } - - public void testNonAutoDeleteTopicIsNotClosedNullConfig() - { - _config = null; - testNonAutoDeleteTopicIsNotClosed(); - } - - public void testTopicIsClosedNullConfig() - { - _config = null; - testTopicIsClosed(); - } - - public void testNonAutoDeleteTopicIsClosedNotDeletedNullConfig() throws AMQException - { - _config = null; - testNonAutoDeleteTopicIsClosedNotDeleted(); - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java deleted file mode 100644 index e0934faf44..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.jms.Session; -import javax.naming.NamingException; -import java.io.IOException; - -/** - * QPID-1447 : Add slow consumer detection and disconnection. - * - * Slow consumers should on a topic should expect to receive a - * 506 : Resource Error if the hit a predefined threshold. - */ -public class GlobalQueuesTest extends TestingBaseCase -{ - - protected String CONFIG_SECTION = ".queues"; - - /** - * Queue Configuration - - - - 4235264 - - - 600000 - - - 50 - - - - TopicDelete - - - - - - - */ - - /** - * VirtualHost Plugin Configuration - - - 1 - MINUTES - - - */ - - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - } - - /** - * Test that setting messageCount takes affect on topics - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - public void testTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting depth has an effect on topics - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Sets the messageAge to be half the disconnection wait timeout - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageCount takes affect on a durable Consumer - * - * Ensure we set the delete-persistent option - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - - public void testTopicDurableConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting depth has an effect on durable consumer topics - * - * Ensure we set the delete-persistent option - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Ensure we set the delete-persistent option - * - * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java deleted file mode 100644 index aff5d1b1b8..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.naming.NamingException; -import java.io.IOException; - -public class GlobalTopicsTest extends GlobalQueuesTest -{ - @Override - public void setUp() throws Exception - { - CONFIG_SECTION = ".topics"; - super.setUp(); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java deleted file mode 100644 index e4efac60f8..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.NamingException; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class MergeConfigurationTest extends TestingBaseCase -{ - - protected int topicCount = 0; - - - public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException - { - - setProperty(".topics.topic("+topicCount+").name", topic); - setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount)); - setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete"); - topicCount++; - } - - - /** - * Test that setting messageCount takes affect on topics - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - public void testTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1); - - //Configure topic as a subscription - setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName()); - configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1)); - - - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - -// -// public void testMerge() throws ConfigurationException, AMQException -// { -// -// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), -// false, false, _virtualHost, null); -// -// _virtualHost.getQueueRegistry().registerQueue(queue); -// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); -// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); -// -// -// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); -// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); -// -// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); -// -// assertNotNull("Queue should have topic configuration bound to it.", config); -// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); -// -// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); -// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration) -// { -// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration."); -// } -// else -// { -// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader()); -// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader()); -// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class)); -// System.err.println("********************** is a "+scdConfig.getClass()); -// } -// -// assertNotNull("Queue should have scd configuration bound to it.", scdConfig); -// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount()); -// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName()); -// } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java deleted file mode 100644 index 9e9375fd44..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.jms.Session; -import javax.naming.NamingException; -import java.io.IOException; - -/** - * Test SCD when configured with Subscription details. - * - * We run the subscription based tests here to validate that the - * subscriptionname value is correctly associated with the subscription. - * - * - */ -public class SubscriptionTest extends TestingBaseCase -{ - private int _count=0; - protected String CONFIG_SECTION = ".topics.topic"; - - /** - * Add configuration for the queue that relates just to this test. - * We use the getTestQueueName() as our subscription. To ensure the - * config sections do not overlap we identify each section with a _count - * value. - * - * This would allow each test to configure more than one section. - * - * @param property to set - * @param value the value to set - * @param deleteDurable should deleteDurable be set. - * @throws NamingException - * @throws IOException - * @throws ConfigurationException - */ - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName()); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - _count++; - } - - - /** - * Test that setting messageCount takes affect on a durable Consumer - * - * Ensure we set the delete-persistent option - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - - public void testTopicDurableConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting depth has an effect on durable consumer topics - * - * Ensure we set the delete-persistent option - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Ensure we set the delete-persistent option - * - * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java deleted file mode 100644 index 9831c74574..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.NamingException; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener -{ - - Topic _destination; - protected CountDownLatch _disconnectionLatch = new CountDownLatch(1); - protected int MAX_QUEUE_MESSAGE_COUNT; - protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; - - private Thread _publisher; - protected static final long DISCONNECTION_WAIT = 5; - protected Exception _publisherError = null; - protected JMSException _connectionException = null; - private static final long JOIN_WAIT = 5000; - - @Override - public void setUp() throws Exception - { - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.delay", "1"); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.timeunit", "SECONDS"); - - } - - - protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException - { - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - property, value); - } - - - /** - * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT - * messages to the provided destination. Messages are sent in a new connection - * on a transaction. Any error is captured and the test is signalled to exit. - * - * @param destination - */ - private void startPublisher(final Destination destination) - { - _publisher = new Thread(new Runnable() - { - - public void run() - { - try - { - Connection connection = getConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer publisher = session.createProducer(destination); - - for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) - { - publisher.send(createNextMessage(session, count)); - session.commit(); - } - } - catch (Exception e) - { - _publisherError = e; - _disconnectionLatch.countDown(); - } - } - }); - - _publisher.start(); - } - - - - /** - * Perform the Main test of a topic Consumer with the given AckMode. - * - * Test creates a new connection and sets up the connection to prevent - * failover - * - * A new consumer is connected and started so that it will prefetch msgs. - * - * An asynchrounous publisher is started to fill the broker with messages. - * - * We then wait to be notified of the disconnection via the ExceptionListener - * - * 0-10 does not have the same notification paths but sync() apparently should - * give us the exception, currently it doesn't, so the test is excluded from 0-10 - * - * We should ensure that this test has the same path for all protocol versions. - * - * Clients should not have to modify their code based on the protocol in use. - * - * @param ackMode @see javax.jms.Session - * - * @throws Exception - */ - protected void topicConsumer(int ackMode, boolean durable) throws Exception - { - Connection connection = getConnection(); - - connection.setExceptionListener(this); - - Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); - - _destination = session.createTopic(getName()); - - MessageConsumer consumer; - - if (durable) - { - consumer = session.createDurableSubscriber(_destination, getTestQueueName()); - } - else - { - consumer = session.createConsumer(_destination); - } - - connection.start(); - - // Start the consumer pre-fetching - // Don't care about response as we will fill the broker up with messages - // after this point and ensure that the client is disconnected at the - // right point. - consumer.receiveNoWait(); - startPublisher(_destination); - - boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - jmsException.initCause(amqe); - _connectionException = jmsException; - } - } - - assertTrue("Client was not disconnected.", _connectionException != null); - - Exception linked = _connectionException.getLinkedException(); - - _publisher.join(JOIN_WAIT); - - assertFalse("Publisher still running", _publisher.isAlive()); - - //Validate publishing occurred ok - if (_publisherError != null) - { - throw _publisherError; - } - - // NOTE these exceptions will need to be modeled so that they are not - // 0-8 specific. e.g. JMSSessionClosedException - - assertNotNull("No error received onException listener.", _connectionException); - - assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); - - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; - - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); - } - - - // Exception Listener - - public void onException(JMSException e) - { - _connectionException = e; - - e.printStackTrace(); - - _disconnectionLatch.countDown(); - } - - /// Connection Listener - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - // Prevent Failover - return false; - } - - public boolean preResubscribe() - { - return false; - } - - public void failoverComplete() - { - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java deleted file mode 100644 index 09c849cfde..0000000000 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.naming.NamingException; -import java.io.IOException; - -/** - * This Topic test extends the Global queue test so it will run all the topic - * and subscription tests. - * - * We redefine the CONFIG_SECTION here so that the configuration is written - * against a topic element. - * - * To complete the migration to testing 'topic' elements we also override - * the setConfig to use the test name as the topic name. - * - */ -public class TopicTest extends GlobalQueuesTest -{ - private int _count=0; - - @Override - public void setUp() throws Exception - { - CONFIG_SECTION = ".topics.topic"; - super.setUp(); - } - - /** - * Add configuration for the queue that relates just to this test. - * We use the getTestQueueName() as our subscription. To ensure the - * config sections do not overlap we identify each section with a _count - * value. - * - * This would allow each test to configure more than one section. - * - * @param property to set - * @param value the value to set - * @param deleteDurable should deleteDurable be set. - * @throws NamingException - * @throws IOException - * @throws ConfigurationException - */ - @Override - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + "("+_count+").name", getName()); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - _count++; - } - - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java new file mode 100644 index 0000000000..dd63c9b698 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.ConversionException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List getParentPaths() + { + return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection"); + } + } + + //Set Default time unit to seconds + TimeUnit _timeUnit = TimeUnit.SECONDS; + + public String[] getElementsProcessed() + { + return new String[]{"delay", + "timeunit"}; + } + + public long getDelay() + { + return getLongValue("delay", 10); + } + + public TimeUnit getTimeUnit() + { + return _timeUnit; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + validatePositiveLong("delay"); + + String timeUnit = getStringValue("timeunit"); + + if (timeUnit != null) + { + try + { + _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase()); + } + catch (IllegalArgumentException iae) + { + throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit); + } + } + + System.out.println("Configured SCDC"); + System.out.println("Delay:" + getDelay()); + System.out.println("TimeUnit:" + getTimeUnit()); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java new file mode 100644 index 0000000000..8e2ecff6fb --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.List; + +public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"name"}; + } + + public String getPolicyName() + { + return getStringValue("name"); + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (getPolicyName() == null) + { + throw new ConfigurationException("No Slow consumer policy defined."); + } + } + + @Override + public String formatToString() + { + return "Policy:"+getPolicyName(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java new file mode 100644 index 0000000000..58131760da --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin +{ + private SlowConsumerPolicyPlugin _policyPlugin; + + public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", + "virtualhosts.virtualhost.topics.slow-consumer-detection", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"messageAge", + "depth", + "messageCount"}; + } + + public long getMessageAge() + { + return getLongValue("messageAge"); + } + + public long getDepth() + { + return getLongValue("depth"); + } + + public long getMessageCount() + { + return getLongValue("messageCount"); + } + + public SlowConsumerPolicyPlugin getPolicy() + { + return _policyPlugin; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (!containsPositiveLong("messageAge") && + !containsPositiveLong("depth") && + !containsPositiveLong("messageCount")) + { + throw new ConfigurationException("At least one configuration property" + + "('messageAge','depth' or 'messageCount') must be specified."); + } + + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); + + PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + Map factories = pluginManager.getSlowConsumerPlugins(); + + if (policyConfig == null) + { + throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet()); + } + + if (_logger.isDebugEnabled()) + { + Iterator keys = policyConfig.getConfig().getKeys(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + _logger.debug("Policy Keys:" + key); + } + + } + + SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); + + if (pluginFactory == null) + { + throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); + } + + _policyPlugin = pluginFactory.newInstance(policyConfig); + + // Debug the creation of this Config + _logger.debug(this); + } + + public String formatToString() + { + StringBuilder sb = new StringBuilder(); + if (getMessageAge() > 0) + { + sb.append("Age:").append(getMessageAge()).append(":"); + } + if (getDepth() > 0) + { + sb.append("Depth:").append(getDepth()).append(":"); + } + if (getMessageCount() > 0) + { + sb.append("Count:").append(getMessageCount()).append(":"); + } + + sb.append("Policy[").append(getPolicy()).append("]"); + return sb.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 717f0d1bee..97c43b940b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -35,6 +35,9 @@ import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; @@ -43,6 +46,9 @@ import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.security.access.plugins.DenyAll; import org.apache.qpid.server.security.access.plugins.LegacyAccess; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.server.virtualhost.plugin.SlowConsumerDetection; +import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleException; import org.osgi.framework.launch.Framework; @@ -57,7 +63,6 @@ public class PluginManager implements Closeable private static final Logger _logger = Logger.getLogger(PluginManager.class); private static final int FELIX_STOP_TIMEOUT = 30000; - private static final String VERSION = "2.6.0.4"; private Framework _felix; @@ -65,11 +70,14 @@ public class PluginManager implements Closeable private ServiceTracker _securityTracker = null; private ServiceTracker _configTracker = null; private ServiceTracker _virtualHostTracker = null; + private ServiceTracker _policyTracker = null; private Activator _activator; private Map _securityPlugins = new HashMap(); private Map, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap, ConfigurationPluginFactory>(); + private Map _vhostPlugins = new HashMap(); + private Map _policyPlugins = new HashMap(); public PluginManager(String pluginPath, String cachePath) throws Exception { @@ -85,10 +93,23 @@ public class PluginManager implements Closeable SecurityManager.SecurityConfiguration.FACTORY, AllowAll.AllowAllConfiguration.FACTORY, DenyAll.DenyAllConfiguration.FACTORY, - LegacyAccess.LegacyAccessConfiguration.FACTORY)) + LegacyAccess.LegacyAccessConfiguration.FACTORY, + new SlowConsumerDetectionConfigurationFactory(), + new SlowConsumerDetectionPolicyConfigurationFactory(), + new SlowConsumerDetectionQueueConfigurationFactory())) { _configPlugins.put(configFactory.getParentPaths(), configFactory); } + for (SlowConsumerPolicyPluginFactory pluginFactory : Arrays.asList( + new TopicDeletePolicy.TopicDeletePolicyFactory())) + { + _policyPlugins.put(pluginFactory.getPluginName(), pluginFactory); + } + for (VirtualHostPluginFactory pluginFactory : Arrays.asList( + new SlowConsumerDetection.SlowConsumerFactory())) + { + _vhostPlugins.put(pluginFactory.getClass().getName(), pluginFactory); + } // Check the plugin directory path is set and exist if (pluginPath == null) @@ -117,6 +138,7 @@ public class PluginManager implements Closeable "org.apache.qpid.common; version=0.7," + "org.apache.qpid.exchange; version=0.7," + "org.apache.qpid.framing; version=0.7," + + "org.apache.qpid.management.common.mbeans.annotations; version=0.7," + "org.apache.qpid.protocol; version=0.7," + "org.apache.qpid.server.binding; version=0.7," + "org.apache.qpid.server.configuration; version=0.7," + @@ -157,7 +179,7 @@ public class PluginManager implements Closeable configMap.put(SYSTEMBUNDLE_ACTIVATORS_PROP, activators); if (cachePath != null) - { + { File cacheDir = new File(cachePath); if (!cacheDir.exists() && cacheDir.canWrite()) { @@ -204,12 +226,11 @@ public class PluginManager implements Closeable _virtualHostTracker = new ServiceTracker(_activator.getContext(), VirtualHostPluginFactory.class.getName(), null); _virtualHostTracker.open(); - - _logger.info("Opened service trackers"); + + _policyTracker = new ServiceTracker(_activator.getContext(), SlowConsumerPolicyPluginFactory.class.getName(), null); + _policyTracker.open(); - // Load security and configuration plugins from their trackers for access - _configPlugins.putAll(getConfigurationServices()); - _securityPlugins.putAll(getPlugins(SecurityPluginFactory.class)); + _logger.info("Opened service trackers"); } private static Map getServices(ServiceTracker tracker) @@ -234,11 +255,18 @@ public class PluginManager implements Closeable return services; } - private Map, ConfigurationPluginFactory> getConfigurationServices() + public static Map getServices(ServiceTracker tracker, Map plugins) + { + Map services = getServices(tracker); + services.putAll(plugins); + return services; + } + + public Map, ConfigurationPluginFactory> getConfigurationPlugins() { Map, ConfigurationPluginFactory> services = new IdentityHashMap, ConfigurationPluginFactory>(); - if (_configTracker.getServices() != null) + if (_configTracker != null && _configTracker.getServices() != null) { for (Object service : _configTracker.getServices()) { @@ -246,49 +274,30 @@ public class PluginManager implements Closeable services.put(factory.getParentPaths(), factory); } } + + services.putAll(_configPlugins); return services; } - public Map> getExchanges() - { - return getServices(_exchangeTracker); + public Map getVirtualHostPlugins() + { + return getServices(_virtualHostTracker, _vhostPlugins); } - public Map getVirtualHostPlugins() - { - return getServices(_virtualHostTracker); + public Map getSlowConsumerPlugins() + { + return getServices(_policyTracker, _policyPlugins); } - public

> Map getPlugins(Class

plugin) + public Map> getExchanges() { - // If plugins are not configured then return an empty set - if (_activator == null) - { - return new HashMap(); - } - - ServiceTracker tracker = new ServiceTracker(_activator.getContext(), plugin.getName(), null); - tracker.open(); - - try - { - return getServices(tracker); - } - finally - { - tracker.close(); - } + return getServices(_exchangeTracker); } public Map getSecurityPlugins() { - return _securityPlugins; - } - - public Map, ConfigurationPluginFactory> getConfigurationPlugins() - { - return _configPlugins; + return getServices(_securityTracker, _securityPlugins); } public void close() @@ -302,6 +311,7 @@ public class PluginManager implements Closeable _securityTracker.close(); _configTracker.close(); _virtualHostTracker.close(); + _policyTracker.close(); } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java new file mode 100644 index 0000000000..d947e9a367 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java @@ -0,0 +1,86 @@ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.Exchange.BindingListener; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * This is a listener that caches queues that are configured for slow consumer disconnection. + * + * There should be one listener per virtual host, which can be added to all exchanges on + * that host. + * + * TODO In future, it will be possible to configure the policy at runtime, so only the queue + * itself is cached, and the configuration looked up by the housekeeping thread. This means + * that there may be occasions where the copy of the cache contents retrieved by the thread + * does not contain queues that are configured, or that configured queues are not present. + * + * @see BindingListener + */ +public class ConfiguredQueueBindingListener implements BindingListener +{ + private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); + + private String _vhostName; + private Set _cache = Collections.synchronizedSet(new HashSet()); + + public ConfiguredQueueBindingListener(String vhostName) + { + _vhostName = vhostName; + } + + /** + * @see BindingListener#bindingAdded(Exchange, Binding) + */ + public void bindingAdded(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + /** + * @see BindingListener#bindingRemoved(Exchange, Binding) + */ + public void bindingRemoved(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + private void processBinding(Binding binding) + { + AMQQueue queue = binding.getQueue(); + + SlowConsumerDetectionQueueConfiguration config = + queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (config != null) + { + _cache.add(queue); + } + else + { + _cache.remove(queue); + } + } + + /** + * Lookup and return the cache of configured {@link AMQQueue}s. + * + * Note that when accessing the cached queues, the {@link Iterator} is not thread safe + * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the + * cache is returned. + * + * @return a copy of the cached {@link java.util.Set} of queues + */ + public Set getQueueCache() + { + return new HashSet(_cache); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java new file mode 100644 index 0000000000..6acb0bc11e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -0,0 +1,161 @@ +/* + * + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.plugins.Plugin; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; + +public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin +{ + private SlowConsumerDetectionConfiguration _config; + private ConfiguredQueueBindingListener _listener; + + public static class SlowConsumerFactory implements VirtualHostPluginFactory + { + public SlowConsumerDetection newInstance(VirtualHost vhost) + { + SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); + + if (config == null) + { + return null; + } + + SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); + plugin.configure(config); + return plugin; + } + } + + /** + * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this + * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * thread. + * + * @see Plugin#configure(ConfigurationPlugin) + */ + public void configure(ConfigurationPlugin config) + { + _config = (SlowConsumerDetectionConfiguration) config; + _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); + for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) + { + _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + } + } + + public SlowConsumerDetection(VirtualHost vhost) + { + super(vhost); + } + + public void execute() + { + CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); + + Set cache = _listener.getQueueCache(); + for (AMQQueue q : cache) + { + CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); + + try + { + SlowConsumerDetectionQueueConfiguration config = + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (checkQueueStatus(q, config)) + { + config.getPolicy().performPolicy(q); + } + } + catch (Exception e) + { + // Don't throw exceptions as this will stop the house keeping task from running. + _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); + } + } + + CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); + } + + public long getDelay() + { + return _config.getDelay(); + } + + public TimeUnit getTimeUnit() + { + return _config.getTimeUnit(); + } + + /** + * Check the depth,messageSize,messageAge,messageCount values for this q + * + * @param q the queue to check + * @param config the queue configuration to compare against the queue state + * + * @return true if the queue has reached a threshold. + */ + private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) + { + if (config != null) + { + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + + int count = q.getMessageCount(); + + // First Check message counts + if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || + // The check queue depth + (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || + // finally if we have messages on the queue check Arrival time. + // We must check count as OldestArrival time is Long.MAX_LONG when + // there are no messages. + (config.getMessageAge() != 0 && + ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); + _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); + _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); + _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); + } + + return true; + } + } + return false; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties new file mode 100644 index 0000000000..2714935a71 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties @@ -0,0 +1,4 @@ +#SlowConsumerDetection.logMessages +RUNNING = SCD-1001 : Running +COMPLETE = SCD-1002 : Complete +CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0} \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties new file mode 100644 index 0000000000..d0f5965c39 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties @@ -0,0 +1,3 @@ +#TopicDeletePolicy.logMessages +DELETING_QUEUE = TDP-1001 : Deleting Queue +DISCONNECTING = TDP-1002 : Disconnecting Session \ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java new file mode 100644 index 0000000000..3bd4ae8d4e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java @@ -0,0 +1,141 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.plugin.logging.TopicDeletePolicyMessages; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +public class TopicDeletePolicy implements SlowConsumerPolicyPlugin +{ + Logger _logger = Logger.getLogger(TopicDeletePolicy.class); + private TopicDeletePolicyConfiguration _configuration; + + public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory + { + public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException + { + TopicDeletePolicyConfiguration config = + configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); + + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(config); + return policy; + } + + public String getPluginName() + { + return "topicdelete"; + } + + public Class getPluginClass() + { + return TopicDeletePolicy.class; + } + } + + public void performPolicy(AMQQueue q) + { + if (q == null) + { + return; + } + + AMQSessionModel owner = q.getExclusiveOwningSession(); + + // Only process exclusive queues + if (owner == null) + { + return; + } + + //Only process Topics + if (!validateQueueIsATopic(q)) + { + return; + } + + try + { + CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); + // Close the consumer . this will cause autoDelete Queues to be purged + owner.getConnectionModel(). + closeSession(owner, AMQConstant.RESOURCE_ERROR, + "Consuming to slow."); + + // Actively delete non autoDelete queues if deletePersistent is set + if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) + { + CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); + q.delete(); + } + + } + catch (AMQException e) + { + _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); + } + + } + + /** + * Check the queue bindings to validate the queue is bound to the + * topic exchange. + * + * @param q the Queue + * + * @return true iff Q is bound to a TopicExchange + */ + private boolean validateQueueIsATopic(AMQQueue q) + { + for (Binding binding : q.getBindings()) + { + if (binding.getExchange() instanceof TopicExchange) + { + return true; + } + } + + return false; + } + + public void configure(ConfigurationPlugin config) + { + _configuration = (TopicDeletePolicyConfiguration) config; + } + + @Override + public String toString() + { + return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java new file mode 100644 index 0000000000..e6ad1cbcc3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +public class TopicDeletePolicyConfiguration extends ConfigurationPlugin +{ + + public static class TopicDeletePolicyConfigurationFactory + implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, + Configuration config) + throws ConfigurationException + { + TopicDeletePolicyConfiguration slowConsumerConfig = + new TopicDeletePolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public List getParentPaths() + { + return Arrays.asList( + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", + "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); + } + } + + public String[] getElementsProcessed() + { + return new String[]{"delete-persistent"}; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + // No validation required. + } + + public boolean deletePersistent() + { + // If we don't have configuration then we don't deletePersistent Queues + return (hasConfiguration() && contains("delete-persistent")); + } + + @Override + public String formatToString() + { + return (deletePersistent()?"delete-durable":""); + } + + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java new file mode 100644 index 0000000000..7f600abdc9 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.plugins.Plugin; +import org.apache.qpid.server.queue.AMQQueue; + +public interface SlowConsumerPolicyPlugin extends Plugin +{ + public void performPolicy(AMQQueue Queue); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java new file mode 100644 index 0000000000..b2fe6766a6 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.plugins.PluginFactory; + +public interface SlowConsumerPolicyPluginFactory

extends PluginFactory

+{ +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java new file mode 100644 index 0000000000..40dc382d30 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java @@ -0,0 +1,346 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +import java.util.concurrent.TimeUnit; + +/** + * Provide Unit Test coverage of the virtualhost SlowConsumer Configuration + * This is what controls how often the plugin will execute + */ +public class SlowConsumerDetectionConfigurationTest extends InternalBrokerBaseCase +{ + + /** + * Default Testing: + * + * Provide a fully complete and valid configuration specifying 'delay' and + * 'timeunit' and ensure that it is correctly processed. + * + * Ensure no exceptions are thrown and that we get the same values back that + * were put into the configuration. + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + String TIMEUNIT=TimeUnit.MICROSECONDS.toString(); + xmlconfig.addProperty("delay", String.valueOf(DELAY)); + xmlconfig.addProperty("timeunit", TIMEUNIT); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("TimeUnit not correctly returned.", + TIMEUNIT, String.valueOf(config.getTimeUnit())); + } + + /** + * Default Testing: + * + * Test Missing TimeUnit value gets default. + * + * The TimeUnit value is optional and default to SECONDS. + * + * Test that if we do not specify a TimeUnit then we correctly get seconds. + * + * Also verify that relying on the default does not impact the setting of + * the 'delay' value. + * + */ + public void testConfigLoadingMissingTimeUnitDefaults() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + xmlconfig.addProperty("delay", String.valueOf(DELAY)); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("Default TimeUnit incorrect", TimeUnit.SECONDS, config.getTimeUnit()); + } + + /** + * Input Testing: + * + * TimeUnit parsing requires the String value be in UpperCase. + * Ensure we can handle when the user doesn't know this. + * + * Same test as 'testConfigLoadingValidConfig' but checking that + * the timeunit field is not case sensitive. + * i.e. the toUpper is being correctly applied. + */ + public void testConfigLoadingValidConfigStrangeTimeUnit() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + long DELAY=10; + + xmlconfig.addProperty("delay", DELAY); + xmlconfig.addProperty("timeunit", "MiCrOsEcOnDs"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); + assertEquals("TimeUnit not correctly returned.", + TimeUnit.MICROSECONDS.toString(), String.valueOf(config.getTimeUnit())); + + } + + /** + * Failure Testing: + * + * Test that delay must be long not a string value. + * Provide a delay as a written value not a long. 'ten'. + * + * This should throw a configuration exception which is being trapped and + * verified to be the right exception, a NumberFormatException. + * + */ + public void testConfigLoadingInValidDelayString() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "ten"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertEquals("Cause not correct", NumberFormatException.class, cause.getClass()); + } + } + + /** + * Failure Testing: + * + * Test that negative delays are invalid. + * + * Delay must be a positive value as negative delay means doesn't make sense. + * + * Configuration exception with a useful message should be thrown here. + * + */ + public void testConfigLoadingInValidDelayNegative() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "-10"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertNotNull("Configuration Exception must not be null.", cause); + assertEquals("Cause not correct", + ConfigurationException.class, cause.getClass()); + assertEquals("Incorrect message.", + "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", + cause.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that delay cannot be 0. + * + * A zero delay means run constantly. This is not how VirtualHostTasks + * are designed to be run so we dis-allow the use of 0 delay. + * + * Same test as 'testConfigLoadingInValidDelayNegative' but with a 0 value. + * + */ + public void testConfigLoadingInValidDelayZero() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "0"); + xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + Throwable cause = e.getCause(); + + assertNotNull("Configuration Exception must not be null.", cause); + assertEquals("Cause not correct", + ConfigurationException.class, cause.getClass()); + assertEquals("Incorrect message.", + "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", + cause.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that missing delay fails. + * If we have no delay then we do not pick a default. So a Configuration + * Exception is thrown. + * + * */ + public void testConfigLoadingInValidMissingDelay() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("timeunit", TimeUnit.SECONDS.toString()); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + assertEquals("Incorrect message.", "SlowConsumerDetectionConfiguration: unable to configure invalid delay:null", e.getMessage()); + } + } + + /** + * Failure Testing: + * + * Test that erroneous TimeUnit fails. + * + * Valid TimeUnit values vary based on the JVM version i.e. 1.6 added HOURS/DAYS etc. + * + * We don't test the values for TimeUnit are accepted other than MILLISECONDS in the + * positive testing at the start. + * + * Here we ensure that an erroneous for TimeUnit correctly throws an exception. + * + * We test with 'foo', which will never be a TimeUnit + * + */ + public void testConfigLoadingInValidTimeUnit() + { + SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); + + String TIMEUNIT = "foo"; + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delay", "10"); + xmlconfig.addProperty("timeunit", TIMEUNIT); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + try + { + config.setConfiguration("", composite); + fail("Configuration should fail to validate"); + } + catch (ConfigurationException e) + { + assertEquals("Incorrect message.", "Unable to configure Slow Consumer Detection invalid TimeUnit:" + TIMEUNIT, e.getMessage()); + } + } + + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java new file mode 100644 index 0000000000..67c177f099 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Test class to ensure that the policy configuration can be processed. + */ +public class SlowConsumerDetectionPolicyConfigurationTest extends InternalBrokerBaseCase +{ + + /** + * Input Testing: + * + * Test that a given String can be set and retrieved through the configuration + * + * No validation is being performed to ensure that the policy exists. Only + * that a value can be set for the policy. + * + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + String policyName = "TestPolicy"; + xmlconfig.addProperty("name", policyName); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + + assertEquals("Policy name not retrieved as expected.", + policyName, config.getPolicyName()); + } + + /** + * Failure Testing: + * + * Test that providing a configuration section without the 'name' field + * causes an exception to be thrown. + * + * An empty configuration is provided and the thrown exception message + * is checked to confirm the right reason. + * + */ + public void testConfigLoadingInValidConfig() + { + SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("Config is invalid so won't validate."); + } + catch (ConfigurationException e) + { + e.printStackTrace(); + assertEquals("Exception message not as expected.", "No Slow consumer policy defined.", e.getMessage()); + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java new file mode 100644 index 0000000000..23828d5c61 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Unit test the QueueConfiguration processing. + * + * This is slightly awkward as the {@link SlowConsumerDetectionQueueConfiguration} + * requries that a policy be available. + *

+ * So all the Valid test much catch the ensuing {@link ConfigurationException} and + * validate that the error is due to a lack of a valid policy. + */ +public class SlowConsumerDetectionQueueConfigurationTest extends InternalBrokerBaseCase +{ + /** + * Test a fully loaded configuration file. + * + * It is not an error to have all control values specified. + *

+ * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingValidConfig() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("messageAge", "60000"); + xmlconfig.addProperty("depth", "1024"); + xmlconfig.addProperty("messageCount", "10"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * When we do not specify any control value then a {@link ConfigurationException} + * must be thrown to remind us. + */ + public void testConfigLoadingMissingConfig() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + + assertEquals("At least one configuration property('messageAge','depth'" + + " or 'messageCount') must be specified.", e.getMessage()); + } + } + + /** + * Setting messageAge on its own is enough to have a valid configuration + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingMessageAgeOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("messageAge", "60000"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * Setting depth on its own is enough to have a valid configuration. + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingDepthOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("depth", "1024"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } + + /** + * Setting messageCount on its own is enough to have a valid configuration. + * + * Here we need to catch the {@link ConfigurationException} that ensues due to lack + * of a policy plugin. + */ + public void testConfigLoadingMessageCountOk() + { + SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + xmlconfig.addProperty("messageCount", "10"); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("", composite); + fail("No Policies are avaialbe to load in a unit test"); + } + catch (ConfigurationException e) + { + assertTrue("Exception message incorrect, was: " + e.getMessage(), + e.getMessage().startsWith("No Slow Consumer Policy specified. Known Policies:[")); + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java new file mode 100644 index 0000000000..8b729a0f43 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.CompositeConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +/** + * Test to ensure TopicDelete Policy configuration can be loaded. + */ +public class TopicDeletePolicyConfigurationTest extends InternalBrokerBaseCase +{ + /** + * Test without any configuration being provided that the + * deletePersistent option is disabled. + */ + public void testNoConfigNoDeletePersistent() + { + TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); + + assertFalse("TopicDelete Configuration with no config should not delete persistent queues.", + config.deletePersistent()); + } + + /** + * Test that with the correct configuration the deletePersistent option can + * be enabled. + * + * Test creates a new Configuration object and passes in the xml snippet + * that the ConfigurationPlugin would receive during normal execution. + * This is the XML that would be matched for this plugin: + * + * + * + * + * So it would be subset and passed in as just: + * + * + * + * The property should therefore be enabled. + * + */ + public void testConfigDeletePersistent() + { + TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); + + XMLConfiguration xmlconfig = new XMLConfiguration(); + + xmlconfig.addProperty("delete-persistent",""); + + // Create a CompositeConfiguration as this is what the broker uses + CompositeConfiguration composite = new CompositeConfiguration(); + composite.addConfiguration(xmlconfig); + + try + { + config.setConfiguration("",composite); + } + catch (ConfigurationException e) + { + fail(e.getMessage()); + } + + assertTrue("A configured TopicDelete should delete persistent queues.", + config.deletePersistent()); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java new file mode 100644 index 0000000000..364766dfa7 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockAMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class TopicDeletePolicyTest extends InternalBrokerBaseCase +{ + + TopicDeletePolicyConfiguration _config; + + VirtualHost _defaultVhost; + InternalTestProtocolSession _connection; + + public void setUp() throws Exception + { + super.setUp(); + + _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost(); + + _connection = new InternalTestProtocolSession(_defaultVhost); + + _config = new TopicDeletePolicyConfiguration(); + + XMLConfiguration config = new XMLConfiguration(); + + _config.setConfiguration("", config); + } + + private MockAMQQueue createOwnedQueue() + { + MockAMQQueue queue = new MockAMQQueue("testQueue"); + + _defaultVhost.getQueueRegistry().registerQueue(queue); + + try + { + AMQChannel channel = new AMQChannel(_connection, 0, null); + _connection.addChannel(channel); + + queue.setExclusiveOwningSession(channel); + } + catch (AMQException e) + { + fail("Unable to create Channel:" + e.getMessage()); + } + + return queue; + } + + private void setQueueToAutoDelete(final AMQQueue queue) + { + ((MockAMQQueue) queue).setAutoDelete(true); + + queue.setDeleteOnNoConsumers(true); + final AMQProtocolSession.Task deleteQueueTask = + new AMQProtocolSession.Task() + { + public void doTask(AMQProtocolSession session) throws AMQException + { + queue.delete(); + } + }; + + ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask); + } + + /** Check that a null queue passed in does not upset the policy. */ + public void testNullQueueParameter() throws ConfigurationException + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + try + { + policy.performPolicy(null); + } + catch (Exception e) + { + fail("Exception should not be thrown:" + e.getMessage()); + } + + } + + /** + * Set a owning Session to null which means this is not an exclusive queue + * so the queue should not be deleted + */ + public void testNonExclusiveQueue() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.setExclusiveOwningSession(null); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + /** + * Test that exclusive JMS Queues are not deleted. + * Bind the queue to the direct exchange (so it is a JMS Queue). + * + * JMS Queues are not to be processed so this should not delete the queue. + */ + public void testQueuesAreNotProcessed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertFalse("Connection should not be closed", _connection.isClosed()); + } + + /** + * Give a non auto-delete queue is bound to the topic exchange the + * TopicDeletePolicy will close the connection and delete the queue, + */ + public void testNonAutoDeleteTopicIsNotClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + queue.setAutoDelete(false); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and delete the queue + */ + public void testTopicIsClosed() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + final MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + setQueueToAutoDelete(queue); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy will + * close the connection and NOT delete the queue + */ + public void testNonAutoDeleteTopicIsClosedNotDeleted() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy suitably + * configured with the delete-persistent tag will close the connection + * and delete the queue + */ + public void testPersistentTopicIsClosedAndDeleted() + { + //Set the config to delete persistent queues + _config.getConfig().addProperty("delete-persistent", ""); + + TopicDeletePolicy policy = new TopicDeletePolicy(); + policy.configure(_config); + + assertTrue("Config was not updated to delete Persistent topics", + _config.deletePersistent()); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertTrue("Queue should be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + /** + * Give a queue bound to the topic exchange the TopicDeletePolicy not + * configured to close a persistent queue + */ + public void testPersistentTopicIsClosedAndDeletedNullConfig() + { + TopicDeletePolicy policy = new TopicDeletePolicy(); + // Explicity say we are not configuring the policy. + policy.configure(null); + + MockAMQQueue queue = createOwnedQueue(); + + queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); + + policy.performPolicy(queue); + + assertFalse("Queue should not be deleted", queue.isDeleted()); + assertTrue("Connection should be closed", _connection.isClosed()); + } + + public void testNonExclusiveQueueNullConfig() + { + _config = null; + testNonExclusiveQueue(); + } + + public void testQueuesAreNotProcessedNullConfig() + { + _config = null; + testQueuesAreNotProcessed(); + } + + public void testNonAutoDeleteTopicIsNotClosedNullConfig() + { + _config = null; + testNonAutoDeleteTopicIsNotClosed(); + } + + public void testTopicIsClosedNullConfig() + { + _config = null; + testTopicIsClosed(); + } + + public void testNonAutoDeleteTopicIsClosedNotDeletedNullConfig() throws AMQException + { + _config = null; + testNonAutoDeleteTopicIsClosedNotDeleted(); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java new file mode 100644 index 0000000000..e0934faf44 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.jms.Session; +import javax.naming.NamingException; +import java.io.IOException; + +/** + * QPID-1447 : Add slow consumer detection and disconnection. + * + * Slow consumers should on a topic should expect to receive a + * 506 : Resource Error if the hit a predefined threshold. + */ +public class GlobalQueuesTest extends TestingBaseCase +{ + + protected String CONFIG_SECTION = ".queues"; + + /** + * Queue Configuration + + + + 4235264 + + + 600000 + + + 50 + + + + TopicDelete + + + + + + + */ + + /** + * VirtualHost Plugin Configuration + + + 1 + MINUTES + + + */ + + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + ".slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + } + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting depth has an effect on topics + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Sets the messageAge to be half the disconnection wait timeout + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java new file mode 100644 index 0000000000..aff5d1b1b8 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/GlobalTopicsTest.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +public class GlobalTopicsTest extends GlobalQueuesTest +{ + @Override + public void setUp() throws Exception + { + CONFIG_SECTION = ".topics"; + super.setUp(); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java new file mode 100644 index 0000000000..e4efac60f8 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/MergeConfigurationTest.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class MergeConfigurationTest extends TestingBaseCase +{ + + protected int topicCount = 0; + + + public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException + { + + setProperty(".topics.topic("+topicCount+").name", topic); + setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount)); + setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete"); + topicCount++; + } + + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1); + + //Configure topic as a subscription + setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName()); + configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1)); + + + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + +// +// public void testMerge() throws ConfigurationException, AMQException +// { +// +// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), +// false, false, _virtualHost, null); +// +// _virtualHost.getQueueRegistry().registerQueue(queue); +// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); +// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); +// +// +// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); +// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); +// +// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); +// +// assertNotNull("Queue should have topic configuration bound to it.", config); +// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); +// +// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); +// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration) +// { +// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration."); +// } +// else +// { +// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader()); +// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader()); +// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class)); +// System.err.println("********************** is a "+scdConfig.getClass()); +// } +// +// assertNotNull("Queue should have scd configuration bound to it.", scdConfig); +// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount()); +// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName()); +// } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java new file mode 100644 index 0000000000..9e9375fd44 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/SubscriptionTest.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.jms.Session; +import javax.naming.NamingException; +import java.io.IOException; + +/** + * Test SCD when configured with Subscription details. + * + * We run the subscription based tests here to validate that the + * subscriptionname value is correctly associated with the subscription. + * + * + */ +public class SubscriptionTest extends TestingBaseCase +{ + private int _count=0; + protected String CONFIG_SECTION = ".topics.topic"; + + /** + * Add configuration for the queue that relates just to this test. + * We use the getTestQueueName() as our subscription. To ensure the + * config sections do not overlap we identify each section with a _count + * value. + * + * This would allow each test to configure more than one section. + * + * @param property to set + * @param value the value to set + * @param deleteDurable should deleteDurable be set. + * @throws NamingException + * @throws IOException + * @throws ConfigurationException + */ + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName()); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + _count++; + } + + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); + + //Start the broker + startBroker(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); + + //Start the broker + startBroker(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java new file mode 100644 index 0000000000..9831c74574 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/TestingBaseCase.java @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener +{ + + Topic _destination; + protected CountDownLatch _disconnectionLatch = new CountDownLatch(1); + protected int MAX_QUEUE_MESSAGE_COUNT; + protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; + + private Thread _publisher; + protected static final long DISCONNECTION_WAIT = 5; + protected Exception _publisherError = null; + protected JMSException _connectionException = null; + private static final long JOIN_WAIT = 5000; + + @Override + public void setUp() throws Exception + { + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.delay", "1"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.timeunit", "SECONDS"); + + } + + + protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + property, value); + } + + + /** + * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT + * messages to the provided destination. Messages are sent in a new connection + * on a transaction. Any error is captured and the test is signalled to exit. + * + * @param destination + */ + private void startPublisher(final Destination destination) + { + _publisher = new Thread(new Runnable() + { + + public void run() + { + try + { + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer publisher = session.createProducer(destination); + + for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) + { + publisher.send(createNextMessage(session, count)); + session.commit(); + } + } + catch (Exception e) + { + _publisherError = e; + _disconnectionLatch.countDown(); + } + } + }); + + _publisher.start(); + } + + + + /** + * Perform the Main test of a topic Consumer with the given AckMode. + * + * Test creates a new connection and sets up the connection to prevent + * failover + * + * A new consumer is connected and started so that it will prefetch msgs. + * + * An asynchrounous publisher is started to fill the broker with messages. + * + * We then wait to be notified of the disconnection via the ExceptionListener + * + * 0-10 does not have the same notification paths but sync() apparently should + * give us the exception, currently it doesn't, so the test is excluded from 0-10 + * + * We should ensure that this test has the same path for all protocol versions. + * + * Clients should not have to modify their code based on the protocol in use. + * + * @param ackMode @see javax.jms.Session + * + * @throws Exception + */ + protected void topicConsumer(int ackMode, boolean durable) throws Exception + { + Connection connection = getConnection(); + + connection.setExceptionListener(this); + + Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); + + _destination = session.createTopic(getName()); + + MessageConsumer consumer; + + if (durable) + { + consumer = session.createDurableSubscriber(_destination, getTestQueueName()); + } + else + { + consumer = session.createConsumer(_destination); + } + + connection.start(); + + // Start the consumer pre-fetching + // Don't care about response as we will fill the broker up with messages + // after this point and ensure that the client is disconnected at the + // right point. + consumer.receiveNoWait(); + startPublisher(_destination); + + boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); + + if (!disconnected && isBroker010()) + { + try + { + ((AMQSession_0_10) session).sync(); + } + catch (AMQException amqe) + { + JMSException jmsException = new JMSException(amqe.getMessage()); + jmsException.setLinkedException(amqe); + jmsException.initCause(amqe); + _connectionException = jmsException; + } + } + + assertTrue("Client was not disconnected.", _connectionException != null); + + Exception linked = _connectionException.getLinkedException(); + + _publisher.join(JOIN_WAIT); + + assertFalse("Publisher still running", _publisher.isAlive()); + + //Validate publishing occurred ok + if (_publisherError != null) + { + throw _publisherError; + } + + // NOTE these exceptions will need to be modeled so that they are not + // 0-8 specific. e.g. JMSSessionClosedException + + assertNotNull("No error received onException listener.", _connectionException); + + assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); + + assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + + AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + } + + + // Exception Listener + + public void onException(JMSException e) + { + _connectionException = e; + + e.printStackTrace(); + + _disconnectionLatch.countDown(); + } + + /// Connection Listener + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + // Prevent Failover + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java new file mode 100644 index 0000000000..09c849cfde --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/TopicTest.java @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +/** + * This Topic test extends the Global queue test so it will run all the topic + * and subscription tests. + * + * We redefine the CONFIG_SECTION here so that the configuration is written + * against a topic element. + * + * To complete the migration to testing 'topic' elements we also override + * the setConfig to use the test name as the topic name. + * + */ +public class TopicTest extends GlobalQueuesTest +{ + private int _count=0; + + @Override + public void setUp() throws Exception + { + CONFIG_SECTION = ".topics.topic"; + super.setUp(); + } + + /** + * Add configuration for the queue that relates just to this test. + * We use the getTestQueueName() as our subscription. To ensure the + * config sections do not overlap we identify each section with a _count + * value. + * + * This would allow each test to configure more than one section. + * + * @param property to set + * @param value the value to set + * @param deleteDurable should deleteDurable be set. + * @throws NamingException + * @throws IOException + * @throws ConfigurationException + */ + @Override + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setProperty(CONFIG_SECTION + "("+_count+").name", getName()); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + _count++; + } + + +} -- cgit v1.2.1