diff options
Diffstat (limited to 'qpid/java')
9 files changed, 251 insertions, 88 deletions
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index c3f5d7e5ac..fbbb205ff0 100644 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -25,20 +25,19 @@ import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfigur import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -class SlowConsumerDetection extends VirtualHostPlugin +class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { Logger _logger = Logger.getLogger(SlowConsumerDetection.class); - private VirtualHost _virtualhost; private SlowConsumerDetectionConfiguration _config; private SlowConsumerPolicyPlugin _policy; public static class SlowConsumerFactory implements VirtualHostPluginFactory { - public VirtualHostPlugin newInstance(VirtualHost vhost) + public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost) { return new SlowConsumerDetection(vhost); } @@ -46,54 +45,42 @@ class SlowConsumerDetection extends VirtualHostPlugin public SlowConsumerDetection(VirtualHost vhost) { - _virtualhost = vhost; + super(vhost); _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); if (_config == null) { throw new IllegalArgumentException("Plugin has not been configured"); } - } @Override public void execute() { _logger.info("Starting the SlowConsumersDetection job"); - try + for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) { - for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) + _logger.debug("Checking consumer status for queue: " + + q.getName()); + try { - _logger.debug("Checking consumer status for queue: " - + q.getName()); - try - { - SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); + SlowConsumerDetectionQueueConfiguration config = + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); - if (checkQueueStatus(q, config)) - { - config.getPolicy().performPolicy(q); - } - } - catch (Exception e) + if (checkQueueStatus(q, config)) { - _logger.error("Exception in SlowConsumersDetection " + - "for queue: " + - q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. + config.getPolicy().performPolicy(q); } } - _logger.info("SlowConsumersDetection job completed."); - } - catch (Exception e) - { - _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e); - } - catch (Error e) - { - _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e); + catch (Exception e) + { + _logger.error("Exception in SlowConsumersDetection " + + "for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } } + _logger.info("SlowConsumersDetection job completed."); } public long getDelay() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java index e552596058..67620d384b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; import org.apache.qpid.server.virtualhost.VirtualHost; import java.nio.ByteBuffer; @@ -189,7 +190,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener } _virtualHost = host; _id = host.getConfigStore().createId(); - _virtualHost.scheduleTask(_virtualHost.getBroker().getManagementPublishInterval(),_updateTask); + _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost)); getConfigStore().addConfiguredObject(this); getQMFService().addListener(this); } @@ -484,17 +485,17 @@ public class ManagementExchange implements Exchange, QMFService.Listener - private final TimerTask _updateTask = new UpdateTask(); - - - private class UpdateTask extends TimerTask + private class UpdateTask extends HouseKeepingTask { + public UpdateTask(VirtualHost vhost) + { + super(vhost); + } - public void run() + public void execute() { publishAllConsole(); publishAllSchema(); - } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 780f5f1159..09ae3bd920 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -213,6 +213,6 @@ public class VirtualHostConfiguration extends ConfigurationPlugin public int getHouseKeepingThreadCount() { - return _config.getInt("housekeeping.threadCount", Runtime.getRuntime().availableProcessors()); + return _config.getInt("housekeeping.poolSize", Runtime.getRuntime().availableProcessors()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java new file mode 100644 index 0000000000..1f4dd56eb1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import org.apache.log4j.Logger; + +public abstract class HouseKeepingTask implements Runnable +{ + Logger _logger = Logger.getLogger(this.getClass()); + + protected VirtualHost _virtualhost; + + private String _name; + + public HouseKeepingTask(VirtualHost vhost) + { + _virtualhost = vhost; + _name = _virtualhost.getName() + ":" + this.getClass().getSimpleName(); + } + + final public void run() + { + // Don't need to undo this as this is a thread pool thread so will + // always go through here before we do any real work. + Thread.currentThread().setName(_name); + try + { + execute(); + } + catch (Throwable e) + { + _logger.warn(this.getClass().getSimpleName() + " throw exception: " + e); + } + } + + + /** Execute the plugin. */ + public abstract void execute(); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index c140e4a144..a5d75d8574 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -37,8 +37,10 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.binding.BindingFactory; +import java.util.List; import java.util.UUID; import java.util.TimerTask; +import java.util.concurrent.FutureTask; public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig { @@ -70,8 +72,17 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo UUID getBrokerId(); - void scheduleTask(long period, TimerTask task); + void scheduleHouseKeepingTask(long period, HouseKeepingTask task); + long getHouseKeepingTaskCount(); + + public long getHouseKeepingCompletedTaskCount(); + + int getHouseKeepingPoolSize(); + + void setHouseKeepingPoolSize(int newSize); + + int getHouseKeepingActiveCount(); IApplicationRegistry getApplicationRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 0252d265fd..413ebe159e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.configuration.ConfigStore; @@ -71,11 +71,12 @@ import javax.management.NotCompliantMBeanException; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -103,7 +104,7 @@ public class VirtualHostImpl implements Accessable, VirtualHost private ACLManager _accessManager; - private final Timer _timer; + private final ScheduledThreadPoolExecutor _houseKeepingTasks; private final IApplicationRegistry _appRegistry; private VirtualHostConfiguration _configuration; private DurableConfigurationStore _durableConfigurationStore; @@ -114,6 +115,7 @@ public class VirtualHostImpl implements Accessable, VirtualHost private final long _createTime = System.currentTimeMillis(); private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); + private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; public void setAccessableName(String name) { @@ -217,7 +219,7 @@ public class VirtualHostImpl implements Accessable, VirtualHost _connectionRegistry = new ConnectionRegistry(this); - _timer = new Timer("TimerThread-" + _name + ":", true); + _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); _queueRegistry = new DefaultQueueRegistry(this); @@ -290,33 +292,35 @@ public class VirtualHostImpl implements Accessable, VirtualHost /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if (period != 0L) { - class HouseKeepingTask extends TimerTask + class ExpiredMessagesTask extends HouseKeepingTask { - Logger _hkLogger = Logger.getLogger(HouseKeepingTask.class); - - public void run() + public ExpiredMessagesTask(VirtualHost vhost) + { + super(vhost); + } + + public void execute() { - _hkLogger.info("Starting the houseKeeping job"); for (AMQQueue q : _queueRegistry.getQueues()) { - _hkLogger.debug("Checking message status for queue: "+q.getName().toString()); + _logger.debug("Checking message status for queue: " + + q.getName()); try { q.checkMessageStatus(); } catch (Exception e) { - _hkLogger.error("Exception in housekeeping for queue: " + q.getNameShortString().toString(), e); + _logger.error("Exception in housekeeping for queue: " + + q.getNameShortString().toString(), e); //Don't throw exceptions as this will stop the // house keeping task from running. } } - _hkLogger.info("HouseKeeping job completed."); } } - final TimerTask expiredMessagesTask = new HouseKeepingTask(); - scheduleTask(period, expiredMessagesTask); + scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); class ForceChannelClosuresTask extends TimerTask { @@ -332,14 +336,12 @@ public class VirtualHostImpl implements Accessable, VirtualHost if (plugins != null) { - ScheduledThreadPoolExecutor vhostTasks - = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount()); - for (String pluginName : plugins.keySet()) { try { - VirtualHostPlugin plugin = plugins.get(pluginName).newInstance(this); + VirtualHostHouseKeepingPlugin plugin = + plugins.get(pluginName).newInstance(this); TimeUnit units = TimeUnit.MILLISECONDS; @@ -359,7 +361,7 @@ public class VirtualHostImpl implements Accessable, VirtualHost } } - vhostTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2, + _houseKeepingTasks.scheduleAtFixedRate(plugin, plugin.getDelay() / 2, plugin.getDelay(), units); _logger.info("Loaded VirtualHostPlugin:" + plugin); @@ -377,9 +379,42 @@ public class VirtualHostImpl implements Accessable, VirtualHost } } - public void scheduleTask(final long period, final TimerTask task) + /** + * Allow other broker components to register a HouseKeepingTask + * + * @param period How often this task should run, in ms. + * @param task The task to run. + */ + public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) + { + _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period, + TimeUnit.MILLISECONDS); + } + + public long getHouseKeepingTaskCount() { - _timer.scheduleAtFixedRate(task, period / 2, period); + return _houseKeepingTasks.getTaskCount(); + } + + public long getHouseKeepingCompletedTaskCount() + { + return _houseKeepingTasks.getCompletedTaskCount(); + } + + public int getHouseKeepingPoolSize() + { + return _houseKeepingTasks.getCorePoolSize(); + } + + public void setHouseKeepingPoolSize(int newSize) + { + _houseKeepingTasks.setCorePoolSize(newSize); + } + + + public int getHouseKeepingActiveCount() + { + return _houseKeepingTasks.getActiveCount(); } @@ -588,9 +623,14 @@ public class VirtualHostImpl implements Accessable, VirtualHost } //Stop Housekeeping - if (_timer != null) + if (_houseKeepingTasks != null) { - _timer.cancel(); + _houseKeepingTasks.shutdown(); + + if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) + { + _houseKeepingTasks.shutdownNow(); + } } //Close MessageStore diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java index e30b5e1934..e76844fa3a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPlugin.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostHouseKeepingPlugin.java @@ -20,25 +20,16 @@ */ package org.apache.qpid.server.virtualhost.plugins; -import org.apache.log4j.Logger; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.VirtualHost; -public abstract class VirtualHostPlugin implements Runnable +public abstract class VirtualHostHouseKeepingPlugin extends HouseKeepingTask { - Logger _logger = Logger.getLogger(this.getClass()); - - final public void run() + public VirtualHostHouseKeepingPlugin(VirtualHost vhost) { - try - { - execute(); - } - catch (Throwable e) - { - _logger.warn(this.getClass().getSimpleName()+" throw exception: " + e); - } + super(vhost); } - /** * Long value representing the delay between repeats * @@ -48,15 +39,11 @@ public abstract class VirtualHostPlugin implements Runnable /** * Option to specify what the delay value represents - * @see java.util.concurrent.TimeUnit for valid value. + * * @return + * + * @see java.util.concurrent.TimeUnit for valid value. */ public abstract String getTimeUnit(); - /** - * Execute the plugin. - */ - public abstract void execute(); - - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java index c32ff76f81..c8bea18444 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/VirtualHostPluginFactory.java @@ -24,5 +24,5 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public interface VirtualHostPluginFactory { - public VirtualHostPlugin newInstance(VirtualHost vhost); + public VirtualHostHouseKeepingPlugin newInstance(VirtualHost vhost); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 683343aa14..c1e2406167 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -129,8 +129,87 @@ public class VirtualHostConfigurationTest extends TestCase assertEquals(3, bTest.getMaximumMessageAge()); ApplicationRegistry.remove(); + } + + /** + * Test that the house keeping pool sizes is correctly processed + * + * @throws Exception + */ + public void testHouseKeepingThreadCount() throws Exception + { + int initialPoolSize = 10; + + configXml.addProperty("virtualhost.testHouseKeepingThreadCount.name", "testHouseKeepingThreadCount"); + configXml.addProperty("virtualhost.testHouseKeepingThreadCount.housekeeping.poolSize", + initialPoolSize); + + VirtualHostConfiguration config = + new VirtualHostConfiguration("testHouseKeepingThreadCount", + configXml.subset("virtualhost.testHouseKeepingThreadCount")); + VirtualHost vhost = + ApplicationRegistry.getInstance().createVirtualHost(config); + + assertEquals("HouseKeeping PoolSize not set correctly.", + initialPoolSize, vhost.getHouseKeepingPoolSize()); + + ApplicationRegistry.remove(); + } + + /** + * Test default house keeping tasks + * + * @throws Exception + */ + public void testDefaultHouseKeepingTasks() throws Exception + { + configXml.addProperty("virtualhost.testDefaultHouseKeepingTasks.name", "testDefaultHouseKeepingTasks"); + VirtualHostConfiguration config = + new VirtualHostConfiguration("testDefaultHouseKeepingTasks", + configXml.subset("virtualhost.testDefaultHouseKeepingTasks")); + VirtualHost vhost = + ApplicationRegistry.getInstance().createVirtualHost(config); + assertEquals("Default houseKeeping task count incorrect.", 2, + vhost.getHouseKeepingTaskCount()); + // Currently the two are tasks: + // ExpiredMessageTask from VirtualHost + // UpdateTask from the QMF ManagementExchange + + + ApplicationRegistry.remove(); } + /** + * Test that we can dynamically change the thread pool size + * + * @throws Exception + */ + public void testDynamicHouseKeepingPoolSizeChange() throws Exception + { + int initialPoolSize = 10; + + configXml.addProperty("virtualhost.testDynamicHouseKeepingPoolSizeChange.name", "testDynamicHouseKeepingPoolSizeChange"); + configXml.addProperty("virtualhost.testDynamicHouseKeepingPoolSizeChange.housekeeping.poolSize", + initialPoolSize); + + VirtualHostConfiguration config = + new VirtualHostConfiguration("testHouseKeepingThreadCount", + configXml.subset("virtualhost.testDynamicHouseKeepingPoolSizeChange")); + VirtualHost vhost = + ApplicationRegistry.getInstance().createVirtualHost(config); + + assertEquals("HouseKeeping PoolSize not set correctly.", + initialPoolSize, vhost.getHouseKeepingPoolSize()); + + vhost.setHouseKeepingPoolSize(1); + + assertEquals("HouseKeeping PoolSize not correctly change.", + 1, vhost.getHouseKeepingPoolSize()); + + ApplicationRegistry.remove(); + } + + } |
