summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-06-02 16:41:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-06-02 16:41:42 +0000
commit6294e42f0ff2d0478c35b9e0dad211f373b45c8b (patch)
tree6e38f28cf4c259d4538aa9f9de5b69a107e7a7b6 /qpid/java
parentadb75b9462b1e494fc074405a3d89fdaf39a1bb6 (diff)
downloadqpid-python-6294e42f0ff2d0478c35b9e0dad211f373b45c8b.tar.gz
QPID-1447 : Finish testing SCD plugin,
Now tests durable subscriptions and all three triggers, depth,messageAge,messageCount git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@950637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java2
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java250
2 files changed, 217 insertions, 35 deletions
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
index def337e241..74b5a12d20 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
@@ -60,7 +60,7 @@ public class TopicDeletePolicyConfiguration extends ConfigurationPlugin
@Override
public void validateConfiguration() throws ConfigurationException
{
- // No validation requried.
+ // No validation required.
}
public boolean deletePersistent()
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
index 2008153338..f0be3d2db0 100644
--- a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
@@ -35,7 +35,9 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.NamingException;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -47,7 +49,7 @@ import java.util.concurrent.TimeUnit;
*/
public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
{
- Destination _destination;
+ Topic _destination;
private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
private int MAX_QUEUE_MESSAGE_COUNT;
private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE;
@@ -59,9 +61,8 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
private static final long JOIN_WAIT = 5000;
@Override
- public void setUp() throws Exception, ConfigurationException, NamingException
+ public void setUp() throws IOException, ConfigurationException, NamingException
{
- // Set the houseKeepingThread period to be every 500
setConfigurationProperty("virtualhosts.virtualhost."
+ getConnectionURL().getVirtualHost().substring(1) +
".slow-consumer-detection.delay", "1");
@@ -75,11 +76,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
".queues.slow-consumer-detection." +
"policy.name", "TopicDelete");
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".queues.maximumMessageCount", "1");
-
-
/**
* Queue Configuration
@@ -100,13 +96,13 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
<topicDelete>
<delete-persistent/>
</topicDelete>
- </policy>
+ </policy>
</slow-consumer-detection>
*/
/**
- * Plugin Configuration
+ * VirtualHost Plugin Configuration
<slow-consumer-detection>
<delay>1</delay>
@@ -116,17 +112,29 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
*/
}
- public void exclusiveTransientQueue(int ackMode) throws Exception
- {
-
- }
-
- public void tempQueue(int ackMode) throws Exception
- {
-
- }
-
- public void topicConsumer(int ackMode) throws Exception
+ /**
+ * Perform the Main test of a topic Consumer with the given AckMode.
+ *
+ * Test creates a new connection and sets up the connection to prevent
+ * failover
+ *
+ * A new consumer is connected and started so that it will prefetch msgs.
+ *
+ * An asynchrounous publisher is started to fill the broker with messages.
+ *
+ * We then wait to be notified of the disconnection via the ExceptionListener
+ *
+ * 0-10 does not have the same notification paths but sync() apparently should
+ * give us the exception, currently it doesn't, so the test is excluded from 0-10
+ *
+ * We should ensure that this test has the same path for all protocol versions.
+ *
+ * Clients should not have to modify their code based on the protocol in use.
+ *
+ * @param ackMode @see javax.jms.Session
+ * @throws Exception
+ */
+ public void topicConsumer(int ackMode, boolean durable) throws Exception
{
Connection connection = getConnection();
@@ -136,7 +144,16 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
_destination = session.createTopic(getName());
- MessageConsumer consumer = session.createConsumer(_destination);
+ MessageConsumer consumer;
+
+ if (durable)
+ {
+ consumer = session.createDurableSubscriber(_destination, getTestQueueName());
+ }
+ else
+ {
+ consumer = session.createConsumer(_destination);
+ }
connection.start();
@@ -149,8 +166,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
- System.out.println("Validating");
-
if (!disconnected && isBroker010())
{
try
@@ -166,24 +181,23 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
}
}
- System.err.println("ConnectionException:" + _connectionException);
-
assertTrue("Client was not disconnected.", _connectionException != null);
Exception linked = _connectionException.getLinkedException();
- System.err.println("Linked:" + linked);
-
_publisher.join(JOIN_WAIT);
assertFalse("Publisher still running", _publisher.isAlive());
-
+
//Validate publishing occurred ok
if (_publisherError != null)
{
throw _publisherError;
}
+ // NOTE these exceptions will need to be modeled so that they are not
+ // 0-8 specific. e.g. JMSSessionClosedException
+
assertNotNull("No error received onException listener.", _connectionException);
assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
@@ -195,6 +209,13 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
}
+ /**
+ * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT
+ * messages to the provided destination. Messages are sent in a new connection
+ * on a transaction. Any error is captured and the test is signalled to exit.
+ *
+ * @param destination
+ */
private void startPublisher(final Destination destination)
{
_publisher = new Thread(new Runnable()
@@ -209,8 +230,6 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
MessageProducer publisher = session.createProducer(destination);
- setMessageSize(MESSAGE_SIZE);
-
for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
{
publisher.send(createNextMessage(session, count));
@@ -228,7 +247,102 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
_publisher.start();
}
- public void testAutoAckTopicConsumerMessageCount() throws Exception
+ /**
+ * Test to write: Check that exclusive Transient Queues are not
+ * disconnected. i.e. JMS Temporary Queues
+ *
+ * @param ackMode
+ *
+ * @throws Exception
+ */
+ public void exclusiveTransientQueue(int ackMode) throws Exception
+ {
+
+ }
+
+ /**
+ * Test that setting messageCount takes affect on topics
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+
+ //Start the broker
+ super.setUp();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting depth has an effect on topics
+ *
+ * Sets the message size for the test
+ * Sets the depth to be 9 * the depth
+ * Ensure that sending 10 messages causes the disconnection
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageSize() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "depth", String.valueOf(MESSAGE_SIZE * 9));
+
+ //Start the broker
+ super.setUp();
+
+ setMessageSize(MESSAGE_SIZE);
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting messageAge has an effect on topics
+ *
+ * Sets the messageAge to be half the disconnection wait timeout
+ * Send 10 messages and then ensure that we get disconnected as we will
+ * wait for the full timeout.
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageAge() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "messageAge", String.valueOf(DISCONNECTION_WAIT / 2));
+
+ //Start the broker
+ super.setUp();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ /**
+ * Test that setting messageCount takes affect on a durable Consumer
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+
+ public void testTopicDurableConsumerMessageCount() throws Exception
{
MAX_QUEUE_MESSAGE_COUNT = 10;
@@ -237,20 +351,88 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis
".queues.slow-consumer-detection." +
"messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+
//Start the broker
super.setUp();
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting depth has an effect on durable consumer topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the message size for the test
+ * Sets the depth to be 9 * the depth
+ * Ensure that sending 10 messages causes the disconnection
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageSize() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "depth", String.valueOf(MESSAGE_SIZE * 9));
+
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+
+ //Start the broker
+ super.setUp();
setMessageSize(MESSAGE_SIZE);
- topicConsumer(Session.AUTO_ACKNOWLEDGE);
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+ /**
+ * Test that setting messageAge has an effect on topics
+ *
+ * Ensure we set the delete-persistent option
+ *
+ * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec)
+ * Send 10 messages and then ensure that we get disconnected as we will
+ * wait for the full timeout.
+ *
+ * @throws Exception
+ */
+ public void testTopicDurableConsumerMessageAge() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ setConfigurationProperty("virtualhosts.virtualhost." +
+ getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "messageAge", String.valueOf(DISCONNECTION_WAIT / 5));
+
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
+ "policy.topicdelete.delete-persistent", "");
+
+ //Start the broker
+ super.setUp();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
}
+ // Exception Listener
+
public void onException(JMSException e)
{
_connectionException = e;
- System.out.println("***** SCT Received Exception: "+e);
+ System.out.println("***** SCT Received Exception: " + e);
e.printStackTrace();
_disconnectionLatch.countDown();