diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-06-02 16:41:42 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-06-02 16:41:42 +0000 |
| commit | 6294e42f0ff2d0478c35b9e0dad211f373b45c8b (patch) | |
| tree | 6e38f28cf4c259d4538aa9f9de5b69a107e7a7b6 /qpid/java | |
| parent | adb75b9462b1e494fc074405a3d89fdaf39a1bb6 (diff) | |
| download | qpid-python-6294e42f0ff2d0478c35b9e0dad211f373b45c8b.tar.gz | |
QPID-1447 : Finish testing SCD plugin,
Now tests durable subscriptions and all three triggers, depth,messageAge,messageCount
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@950637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 217 insertions, 35 deletions
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java index def337e241..74b5a12d20 100644 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java @@ -60,7 +60,7 @@ public class TopicDeletePolicyConfiguration extends ConfigurationPlugin @Override public void validateConfiguration() throws ConfigurationException { - // No validation requried. + // No validation required. } public boolean deletePersistent() diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java index 2008153338..f0be3d2db0 100644 --- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java @@ -35,7 +35,9 @@ import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.Topic; import javax.naming.NamingException; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -47,7 +49,7 @@ import java.util.concurrent.TimeUnit; */ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener { - Destination _destination; + Topic _destination; private CountDownLatch _disconnectionLatch = new CountDownLatch(1); private int MAX_QUEUE_MESSAGE_COUNT; private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; @@ -59,9 +61,8 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis private static final long JOIN_WAIT = 5000; @Override - public void setUp() throws Exception, ConfigurationException, NamingException + public void setUp() throws IOException, ConfigurationException, NamingException { - // Set the houseKeepingThread period to be every 500 setConfigurationProperty("virtualhosts.virtualhost." + getConnectionURL().getVirtualHost().substring(1) + ".slow-consumer-detection.delay", "1"); @@ -75,11 +76,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis ".queues.slow-consumer-detection." + "policy.name", "TopicDelete"); - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.maximumMessageCount", "1"); - - /** * Queue Configuration @@ -100,13 +96,13 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis <topicDelete> <delete-persistent/> </topicDelete> - </policy> + </policy> </slow-consumer-detection> */ /** - * Plugin Configuration + * VirtualHost Plugin Configuration <slow-consumer-detection> <delay>1</delay> @@ -116,17 +112,29 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis */ } - public void exclusiveTransientQueue(int ackMode) throws Exception - { - - } - - public void tempQueue(int ackMode) throws Exception - { - - } - - public void topicConsumer(int ackMode) throws Exception + /** + * Perform the Main test of a topic Consumer with the given AckMode. + * + * Test creates a new connection and sets up the connection to prevent + * failover + * + * A new consumer is connected and started so that it will prefetch msgs. + * + * An asynchrounous publisher is started to fill the broker with messages. + * + * We then wait to be notified of the disconnection via the ExceptionListener + * + * 0-10 does not have the same notification paths but sync() apparently should + * give us the exception, currently it doesn't, so the test is excluded from 0-10 + * + * We should ensure that this test has the same path for all protocol versions. + * + * Clients should not have to modify their code based on the protocol in use. + * + * @param ackMode @see javax.jms.Session + * @throws Exception + */ + public void topicConsumer(int ackMode, boolean durable) throws Exception { Connection connection = getConnection(); @@ -136,7 +144,16 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis _destination = session.createTopic(getName()); - MessageConsumer consumer = session.createConsumer(_destination); + MessageConsumer consumer; + + if (durable) + { + consumer = session.createDurableSubscriber(_destination, getTestQueueName()); + } + else + { + consumer = session.createConsumer(_destination); + } connection.start(); @@ -149,8 +166,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - System.out.println("Validating"); - if (!disconnected && isBroker010()) { try @@ -166,24 +181,23 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis } } - System.err.println("ConnectionException:" + _connectionException); - assertTrue("Client was not disconnected.", _connectionException != null); Exception linked = _connectionException.getLinkedException(); - System.err.println("Linked:" + linked); - _publisher.join(JOIN_WAIT); assertFalse("Publisher still running", _publisher.isAlive()); - + //Validate publishing occurred ok if (_publisherError != null) { throw _publisherError; } + // NOTE these exceptions will need to be modeled so that they are not + // 0-8 specific. e.g. JMSSessionClosedException + assertNotNull("No error received onException listener.", _connectionException); assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); @@ -195,6 +209,13 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); } + /** + * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT + * messages to the provided destination. Messages are sent in a new connection + * on a transaction. Any error is captured and the test is signalled to exit. + * + * @param destination + */ private void startPublisher(final Destination destination) { _publisher = new Thread(new Runnable() @@ -209,8 +230,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis MessageProducer publisher = session.createProducer(destination); - setMessageSize(MESSAGE_SIZE); - for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) { publisher.send(createNextMessage(session, count)); @@ -228,7 +247,102 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis _publisher.start(); } - public void testAutoAckTopicConsumerMessageCount() throws Exception + /** + * Test to write: Check that exclusive Transient Queues are not + * disconnected. i.e. JMS Temporary Queues + * + * @param ackMode + * + * @throws Exception + */ + public void exclusiveTransientQueue(int ackMode) throws Exception + { + + } + + /** + * Test that setting messageCount takes affect on topics + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + public void testTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting depth has an effect on topics + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "depth", String.valueOf(MESSAGE_SIZE * 9)); + + //Start the broker + super.setUp(); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Sets the messageAge to be half the disconnection wait timeout + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "messageAge", String.valueOf(DISCONNECTION_WAIT / 2)); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, false); + } + + /** + * Test that setting messageCount takes affect on a durable Consumer + * + * Ensure we set the delete-persistent option + * + * We send 10 messages and disconnect at 9 + * + * @throws Exception + */ + + public void testTopicDurableConsumerMessageCount() throws Exception { MAX_QUEUE_MESSAGE_COUNT = 10; @@ -237,20 +351,88 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis ".queues.slow-consumer-detection." + "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + //Start the broker super.setUp(); + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting depth has an effect on durable consumer topics + * + * Ensure we set the delete-persistent option + * + * Sets the message size for the test + * Sets the depth to be 9 * the depth + * Ensure that sending 10 messages causes the disconnection + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageSize() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "depth", String.valueOf(MESSAGE_SIZE * 9)); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + + //Start the broker + super.setUp(); setMessageSize(MESSAGE_SIZE); - topicConsumer(Session.AUTO_ACKNOWLEDGE); + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); + } + + /** + * Test that setting messageAge has an effect on topics + * + * Ensure we set the delete-persistent option + * + * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) + * Send 10 messages and then ensure that we get disconnected as we will + * wait for the full timeout. + * + * @throws Exception + */ + public void testTopicDurableConsumerMessageAge() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "messageAge", String.valueOf(DISCONNECTION_WAIT / 5)); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + + //Start the broker + super.setUp(); + + topicConsumer(Session.AUTO_ACKNOWLEDGE, true); } + // Exception Listener + public void onException(JMSException e) { _connectionException = e; - System.out.println("***** SCT Received Exception: "+e); + System.out.println("***** SCT Received Exception: " + e); e.printStackTrace(); _disconnectionLatch.countDown(); |
