diff options
Diffstat (limited to 'qpid/java')
8 files changed, 295 insertions, 359 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java index c8ee61685c..5ef56045bb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java @@ -1,22 +1,3 @@ -package org.apache.qpid.client.prefetch; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -37,6 +18,26 @@ import org.slf4j.LoggerFactory; * under the License. * */ +package org.apache.qpid.client.prefetch; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + public class PrefetchBehaviourTest extends QpidBrokerTestCase { protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class); @@ -130,7 +131,59 @@ public class PrefetchBehaviourTest extends QpidBrokerTestCase //wait for the other consumer to finish to ensure it completes ok _logger.debug("waiting for async consumer to complete"); assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS)); - assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); + assertFalse("Unexpected exception during async message processing",_exceptionCaught.get()); + } + + /** + * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty. + * + */ + public void testLowPrefetchCausesMessagesToBeDistributedBetweenConsumers() throws Exception + { + Queue queue = getTestQueue(); + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); + + Connection connection = getConnection(); + connection.start(); + // Create two consumers on different sessions + Session consSessA = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumerA = consSessA.createConsumer(queue); + _logger.debug("Consumer A " + consumerA); + + Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, queue, 3); + + MessageConsumer consumerB = null; + + if (isBroker010()) + { + consumerB = consSessA.createConsumer(queue); + } + else + { + // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. + Session consSessB = connection.createSession(true, Session.SESSION_TRANSACTED); + consumerB = consSessB.createConsumer(queue); + } + _logger.debug("Consumer B " + consumerB); + + Message msg; + // Check that consumer A has 2 messages + for (int i = 0; i < 2; i++) + { + msg = consumerA.receive(1500); + assertNotNull("Consumer A should receive 2 messages", msg); + } + + _logger.debug("Checking that Consumer A does not have 3rd message"); + msg = consumerA.receive(1500); + assertNull("Consumer A should not have received a 3rd message",msg); + + // Check that consumer B has the last message + _logger.debug("Checking that Consumer B does have 3rd message"); + msg = consumerB.receive(1500); + assertNotNull("Consumer B should have received the message",msg); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java deleted file mode 100644 index 474a425b28..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.client; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TopicSession; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQConnectionTest extends QpidBrokerTestCase -{ - protected static AMQConnection _connection; - protected static AMQTopic _topic; - protected static AMQQueue _queue; - private static QueueSession _queueSession; - private static TopicSession _topicSession; - protected static final Logger _logger = LoggerFactory.getLogger(AMQConnectionTest.class); - - protected void setUp() throws Exception - { - super.setUp(); - createConnection(); - _topic = new AMQTopic(_connection.getDefaultTopicExchangeName(), new AMQShortString("mytopic")); - _queue = new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("myqueue")); - } - - @Override - protected void tearDown() throws Exception - { - _connection.close(); - super.tearDown(); //To change body of overridden methods use File | Settings | File Templates. - } - - protected void createConnection() throws Exception - { - _connection = (AMQConnection) getConnection("guest", "guest"); - } - - /** - * Simple tests to check we can create TopicSession and QueueSession ok - * And that they throw exceptions where appropriate as per JMS spec - */ - - public void testCreateQueueSession() throws JMSException - { - createQueueSession(); - } - - private void createQueueSession() throws JMSException - { - _queueSession = _connection.createQueueSession(false, AMQSession.NO_ACKNOWLEDGE); - } - - public void testCreateTopicSession() throws JMSException - { - createTopicSession(); - } - - private void createTopicSession() throws JMSException - { - _topicSession = _connection.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); - } - - public void testTopicSessionCreateBrowser() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createBrowser(_queue); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testTopicSessionCreateQueue() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createQueue("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testTopicSessionCreateTemporaryQueue() throws JMSException - { - createTopicSession(); - try - { - _topicSession.createTemporaryQueue(); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionCreateTemporaryTopic() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createTemporaryTopic(); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionCreateTopic() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createTopic("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionDurableSubscriber() throws JMSException - { - createQueueSession(); - try - { - _queueSession.createDurableSubscriber(_topic, "abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testQueueSessionUnsubscribe() throws JMSException - { - createQueueSession(); - try - { - _queueSession.unsubscribe("abc"); - fail("expected exception did not occur"); - } - catch (javax.jms.IllegalStateException s) - { - // ok - } - catch (Exception e) - { - fail("expected javax.jms.IllegalStateException, got " + e); - } - } - - public void testPrefetchSystemProperty() throws Exception - { - _connection.close(); - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString()); - - createConnection(); - _connection.start(); - // Create two consumers on different sessions - Session consSessA = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumerA = consSessA.createConsumer(_queue); - - Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(_queue); - - // Send 3 messages - for (int i = 0; i < 3; i++) - { - producer.send(producerSession.createTextMessage("test")); - } - producerSession.commit(); - - MessageConsumer consumerB = null; - // 0-8, 0-9, 0-9-1 prefetch is per session, not consumer. - if (!isBroker010()) - { - Session consSessB = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - consumerB = consSessB.createConsumer(_queue); - } - else - { - consumerB = consSessA.createConsumer(_queue); - } - - Message msg; - // Check that consumer A has 2 messages - for (int i = 0; i < 2; i++) - { - msg = consumerA.receive(1500); - assertNotNull("Consumer A should receive 2 messages",msg); - } - - msg = consumerA.receive(1500); - assertNull("Consumer A should not have received a 3rd message",msg); - - // Check that consumer B has the last message - msg = consumerB.receive(1500); - assertNotNull("Consumer B should have received the message",msg); - } - - - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java deleted file mode 100644 index 5f3daa407a..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSSLConnectionTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.unit.client; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionURL; - -public class AMQSSLConnectionTest extends AMQConnectionTest -{ - private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks"; - private static final String KEYSTORE_PASSWORD = "password"; - private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks"; - private static final String TRUSTSTORE_PASSWORD = "password"; - - @Override - protected void setUp() throws Exception - { - setTestClientSystemProperty("profile.use_ssl", "true"); - setConfigurationProperty("connector.ssl.enabled", "true"); - setConfigurationProperty("connector.ssl.sslOnly", "true"); - super.setUp(); - } - - protected void createConnection() throws Exception - { - - final String sslPrototypeUrl = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" + - "?ssl='true'&ssl_verify_hostname='false'" + - "&key_store='%s'&key_store_password='%s'" + - "&trust_store='%s'&trust_store_password='%s'" + - "'"; - - final String url = String.format(sslPrototypeUrl,System.getProperty("test.port.ssl"), - KEYSTORE,KEYSTORE_PASSWORD,TRUSTSTORE,TRUSTSTORE_PASSWORD); - - _connection = (AMQConnection) getConnection(new AMQConnectionURL(url)); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 93cceb1048..c33dde53b7 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -101,10 +101,4 @@ public class AMQSessionTest extends QpidBrokerTestCase assertEquals("Queue names should match from QueueReceiver with selector", _queue.getQueueName(), receiver.getQueue().getQueueName()); } - public static void stopVmBrokers() - { - _queue = null; - _topic = null; - _session = null; - } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java new file mode 100644 index 0000000000..ef90ab8ffe --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/QueueSessionFactoryTest.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Ensures that queue specific session factory method {@link QueueConnection#createQueueSession()} create sessions + * of type {@link QueueSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only topics. + * + * @see TopicSessionFactoryTest + */ +public class QueueSessionFactoryTest extends QpidBrokerTestCase +{ + public void testQueueSessionIsNotATopicSession() throws Exception + { + QueueSession queueSession = getQueueSession(); + assertFalse(queueSession instanceof TopicSession); + } + + public void testQueueSessionCannotCreateTemporaryTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTemporaryTopic(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateTopics() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.createTopic("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTopic from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannotCreateDurableSubscriber() throws Exception + { + QueueSession queueSession = getQueueSession(); + Topic topic = getTestTopic(); + + try + { + queueSession.createDurableSubscriber(topic, "abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createDurableSubscriber from QueueSession", s.getMessage()); + } + } + + public void testQueueSessionCannoutUnsubscribe() throws Exception + { + QueueSession queueSession = getQueueSession(); + try + { + queueSession.unsubscribe("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call unsubscribe from QueueSession", s.getMessage()); + } + } + + private QueueSession getQueueSession() throws Exception + { + QueueConnection queueConnection = (QueueConnection)getConnection(); + return queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java new file mode 100644 index 0000000000..6397f15e0a --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/TopicSessionFactoryTest.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client; + +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Ensures that topic specific session factory method {@link TopicConnection#createTopicSession()} create sessions + * of type {@link TopicSession} and that those sessions correctly restrict the available JMS operations + * operations to exclude those applicable to only queues. + * + * @see QueueSessionFactoryTest + */ +public class TopicSessionFactoryTest extends QpidBrokerTestCase +{ + public void testTopicSessionIsNotAQueueSession() throws Exception + { + TopicSession topicSession = getTopicSession(); + assertFalse(topicSession instanceof QueueSession); + } + + public void testTopicSessionCannotCreateCreateBrowser() throws Exception + { + TopicSession topicSession = getTopicSession(); + Queue queue = getTestQueue(); + try + { + topicSession.createBrowser(queue); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createBrowser from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createQueue("abc"); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createQueue from TopicSession", s.getMessage()); + } + } + + public void testTopicSessionCannotCreateTemporaryQueues() throws Exception + { + TopicSession topicSession = getTopicSession(); + try + { + topicSession.createTemporaryQueue(); + fail("expected exception did not occur"); + } + catch (javax.jms.IllegalStateException s) + { + // PASS + assertEquals("Cannot call createTemporaryQueue from TopicSession", s.getMessage()); + } + } + + private TopicSession getTopicSession() throws Exception + { + TopicConnection topicConnection = (TopicConnection)getConnection(); + return topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index f680a20288..9a8da14f83 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -46,6 +46,7 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; +import javax.jms.Topic; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -56,6 +57,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ConnectionURL; @@ -1101,6 +1103,15 @@ public class QpidBrokerTestCase extends QpidTestCase return new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName()); } + /** + * Return a Topic specific for this test. + * Uses getTestQueueName() as the name of the topic + * @return + */ + public Topic getTestTopic() + { + return new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, getTestQueueName()); + } protected void tearDown() throws java.lang.Exception { diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index d8c463b810..9e9bf6779a 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -23,10 +23,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx // QPID-3576: Java client issue. MessageConsumer#close() time-out. org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions -//This test requires SSL, but SSL is only enabled for the C++ broker in the cpp.ssl test profile -//which runs *all* the tests with SSL, so this one can be excluded safely enough -org.apache.qpid.test.unit.client.AMQSSLConnectionTest#* - org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* org.apache.qpid.client.ResetMessageListenerTest#* |
