diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-14 20:02:03 +0000 |
| commit | a22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch) | |
| tree | 5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/client/src/test | |
| parent | 9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff) | |
| download | qpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz | |
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
38 files changed, 1274 insertions, 164 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java new file mode 100644 index 0000000000..6b03dd32e8 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerMultiConsumerTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class); + + Context _context; + + private static final int MSG_COUNT = 6; + private int receivedCount1 = 0; + private int receivedCount2 = 0; + private Connection _clientConnection; + private MessageConsumer _consumer1; + private MessageConsumer _consumer2; + + private boolean _testAsync; + private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer1 = clientSession1.createConsumer(queue); + + //Create Client 2 + Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer2 = clientSession2.createConsumer(queue); + + //Create Producer + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testRecieveC1thenC2() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + + assertTrue(_consumer1.receive() != null); + } + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer2.receive() != null); + } + } + + public void testRecieveInterleaved() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT / 2; msg++) + { + assertTrue(_consumer1.receive() != null); + assertTrue(_consumer2.receive() != null); + } + } + + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer1.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message); + + receivedCount1++; + + if (receivedCount1 == MSG_COUNT / 2) + { + _allMessagesSent.countDown(); + } + + } + }); + + _consumer2.setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); + + receivedCount2++; + if (receivedCount2 == MSG_COUNT / 2) + { + _allMessagesSent.countDown(); + } + } + }); + + + _logger.info("Waiting upto 2 seconds for messages"); + + try + { + _allMessagesSent.await(2000, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java new file mode 100644 index 0000000000..0739acfabd --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Message; +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** + * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue + * <p/> + * The message delivery process: + * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s + * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start + * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a + * session can run in any order and a synchronous put/poll will block the dispatcher). + * <p/> + * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered + * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. + */ +public class MessageListenerTest extends TestCase implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(MessageListenerTest.class); + + Context _context; + + private static final int MSG_COUNT = 5; + private int receivedCount = 0; + private MessageConsumer _consumer; + private Connection _clientConnection; + private boolean _testAsync; + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'"); + env.put("queue.queue", "direct://amq.direct//MessageListenerTest"); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + + _consumer = clientSession.createConsumer(queue); + + //Create Producer + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.send(producerSession.createTextMessage("Message " + msg)); + } + + producerConnection.close(); + + _testAsync = false; + } + + protected void tearDown() throws Exception + { + //Should have recieved all async messages + if (_testAsync) + { + assertEquals(MSG_COUNT, receivedCount); + } + _clientConnection.close(); + + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testSynchronousRecieve() throws Exception + { + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(_consumer.receive(2000) != null); + } + } + + public void testAsynchronousRecieve() throws Exception + { + _testAsync = true; + + _consumer.setMessageListener(this); + + + _logger.info("Waiting 3 seconds for messages"); + + try + { + Thread.sleep(2000); + } + catch (InterruptedException e) + { + //do nothing + } + + } + + public void onMessage(Message message) + { + _logger.info("Received Message(" + receivedCount + "):" + message); + + receivedCount++; + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(MessageListenerTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java new file mode 100644 index 0000000000..f7bea1b36a --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java @@ -0,0 +1,231 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import javax.jms.*; +import java.util.Enumeration; +import java.io.Serializable; + +public class TestNonQpidTextMessage implements ObjectMessage { + + private JMSObjectMessage _realMessage; + private String _contentString; + + /** + * Allows us to construct a JMS message which + * does not inherit from the Qpid message superclasses + * and expand our unit testing of MessageConverter et al + */ + public TestNonQpidTextMessage() + { + _realMessage = new JMSObjectMessage(); + } + + public String getJMSMessageID() throws JMSException { + return _realMessage.getJMSMessageID(); + } + + public void setJMSMessageID(String string) throws JMSException { + _realMessage.setJMSMessageID(string); + } + + public long getJMSTimestamp() throws JMSException { + return _realMessage.getJMSTimestamp(); + } + + public void setJMSTimestamp(long l) throws JMSException { + _realMessage.setJMSTimestamp(l); + } + + public byte[] getJMSCorrelationIDAsBytes() throws JMSException { + return _realMessage.getJMSCorrelationIDAsBytes(); + } + + public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException { + _realMessage.setJMSCorrelationIDAsBytes(bytes); + } + + public void setJMSCorrelationID(String string) throws JMSException { + _realMessage.setJMSCorrelationID(string); + } + + public String getJMSCorrelationID() throws JMSException { + return _realMessage.getJMSCorrelationID(); + } + + public Destination getJMSReplyTo() throws JMSException { + return _realMessage.getJMSReplyTo(); + } + + public void setJMSReplyTo(Destination destination) throws JMSException { + _realMessage.setJMSReplyTo(destination); + } + + public Destination getJMSDestination() throws JMSException { + return _realMessage.getJMSDestination(); + } + + public void setJMSDestination(Destination destination) throws JMSException { + _realMessage.setJMSDestination(destination); + } + + public int getJMSDeliveryMode() throws JMSException { + return _realMessage.getJMSDeliveryMode(); + } + + public void setJMSDeliveryMode(int i) throws JMSException { + _realMessage.setJMSDeliveryMode(i); + } + + public boolean getJMSRedelivered() throws JMSException { + return _realMessage.getJMSRedelivered(); + } + + public void setJMSRedelivered(boolean b) throws JMSException { + _realMessage.setJMSRedelivered(b); + } + + public String getJMSType() throws JMSException { + return _realMessage.getJMSType(); + } + + public void setJMSType(String string) throws JMSException { + _realMessage.setJMSType(string); + } + + public long getJMSExpiration() throws JMSException { + return _realMessage.getJMSExpiration(); + } + + public void setJMSExpiration(long l) throws JMSException { + _realMessage.setJMSExpiration(l); + } + + public int getJMSPriority() throws JMSException { + return _realMessage.getJMSPriority(); + } + + public void setJMSPriority(int i) throws JMSException { + _realMessage.setJMSPriority(i); + } + + public void clearProperties() throws JMSException { + _realMessage.clearProperties(); + } + + public boolean propertyExists(String string) throws JMSException { + return _realMessage.propertyExists(string); + } + + public boolean getBooleanProperty(String string) throws JMSException { + return _realMessage.getBooleanProperty(string); + } + + public byte getByteProperty(String string) throws JMSException { + return _realMessage.getByteProperty(string); + } + + public short getShortProperty(String string) throws JMSException { + return _realMessage.getShortProperty(string); + } + + public int getIntProperty(String string) throws JMSException { + return _realMessage.getIntProperty(string); + } + + public long getLongProperty(String string) throws JMSException { + return _realMessage.getLongProperty(string); + } + + public float getFloatProperty(String string) throws JMSException { + return _realMessage.getFloatProperty(string); + } + + public double getDoubleProperty(String string) throws JMSException { + return _realMessage.getDoubleProperty(string); + } + + public String getStringProperty(String string) throws JMSException { + return _realMessage.getStringProperty(string); + } + + public Object getObjectProperty(String string) throws JMSException { + return _realMessage.getObjectProperty(string); + } + + public Enumeration getPropertyNames() throws JMSException { + return _realMessage.getPropertyNames(); + } + + public void setBooleanProperty(String string, boolean b) throws JMSException { + _realMessage.setBooleanProperty(string,b); + } + + public void setByteProperty(String string, byte b) throws JMSException { + _realMessage.setByteProperty(string,b); + } + + public void setShortProperty(String string, short i) throws JMSException { + _realMessage.setShortProperty(string,i); + } + + public void setIntProperty(String string, int i) throws JMSException { + _realMessage.setIntProperty(string,i); + } + + public void setLongProperty(String string, long l) throws JMSException { + _realMessage.setLongProperty(string,l); + } + + public void setFloatProperty(String string, float v) throws JMSException { + _realMessage.setFloatProperty(string,v); + } + + public void setDoubleProperty(String string, double v) throws JMSException { + _realMessage.setDoubleProperty(string,v); + } + + public void setStringProperty(String string, String string1) throws JMSException { + _realMessage.setStringProperty(string,string1); + } + + public void setObjectProperty(String string, Object object) throws JMSException { + _realMessage.setObjectProperty(string,object); + } + + public void acknowledge() throws JMSException { + _realMessage.acknowledge(); + } + + public void clearBody() throws JMSException { + _realMessage.clearBody(); + } + + public void setObject(Serializable serializable) throws JMSException { + if (serializable instanceof String) + { + _contentString = (String)serializable; + } + } + + public Serializable getObject() throws JMSException { + return _contentString; } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index d12ab01bdc..4a8c0145c4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -25,37 +25,46 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; import javax.jms.*; +import java.util.concurrent.atomic.AtomicInteger; public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); + private Exception _error; + private AtomicInteger count; + protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); + _error = null; + count = new AtomicInteger(); } protected void tearDown() throws Exception { super.tearDown(); TransportConnection.killAllVMBrokers(); + count = null; } public void testRecoverResendsMsgs() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); + Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -104,15 +113,15 @@ public class RecoverTest extends TestCase public void testRecoverResendsMsgsAckOnEarlier() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); + Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); @@ -168,15 +177,15 @@ public class RecoverTest extends TestCase public void testAcknowledgePerConsumer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("Q1", "Q1", false, true); - Queue queue2 = new AMQQueue("Q2", "Q2", false, true); + Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); MessageProducer producer2 = producerSession.createProducer(queue2); @@ -207,41 +216,96 @@ public class RecoverTest extends TestCase public void testRecoverInAutoAckListener() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = new AMQQueue("Q1", "Q1", false, true); + Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); MessageProducer producer = consumerSession.createProducer(queue); producer.send(consumerSession.createTextMessage("hello")); - MessageConsumer consumer = consumerSession.createConsumer(queue); + + + final Object lock = new Object(); + consumer.setMessageListener(new MessageListener() { - private int count = 0; + + public void onMessage(Message message) { try { - if (count++ == 0) + count.incrementAndGet(); + if (count.get() == 1) { - assertFalse(message.getJMSRedelivered()); + if(message.getJMSRedelivered()) + { + setError(new Exception("Message marked as redilvered on what should be first delivery attempt")); + } consumerSession.recover(); } - else if (count++ == 1) + else if (count.get() == 2) { - assertTrue(message.getJMSRedelivered()); + if(!message.getJMSRedelivered()) + { + setError(new Exception("Message not marked as redilvered on what should be second delivery attempt")); + } } else { - fail("Message delivered too many times!"); + System.err.println(message); + fail("Message delivered too many times!: " + count); } } catch (JMSException e) { _logger.error("Error recovering session: " + e, e); + setError(e); + } + synchronized(lock) + { + lock.notify(); } } }); + + con.start(); + + long waitTime = 300000L; + long waitUntilTime = System.currentTimeMillis() + waitTime; + + synchronized(lock) + { + while((count.get() <= 1) && (waitTime > 0)) + { + lock.wait(waitTime); + if(count.get() <= 1) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + } + + Thread.sleep(1000); + + if(count.get() != 2) + { + System.err.println("Count != 2 : " + count); + } + assertTrue(count.get() == 2); + + con.close(); + + if(_error != null) + { + throw _error; + } + } + + private void setError(Exception e) + { + _error = e; } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java index 59be38f0dd..cf5b5c76e5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java @@ -48,7 +48,7 @@ public class BytesMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java index ad180e3a89..fb347053c7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java @@ -35,16 +35,20 @@ import junit.framework.TestCase; public class FieldTableKeyEnumeratorTest extends TestCase { + public void testTrue() + { + + } public void testKeyEnumeration() { FieldTable result = FieldTableFactory.newFieldTable(); - result.put("one", 1L); - result.put("two", 2L); - result.put("three", 3L); - result.put("four", 4L); - result.put("five", 5L); + result.setObject("one", 1L); + result.setObject("two", 2L); + result.setObject("three", 3L); + result.setObject("four", 4L); + result.setObject("five", 5L); - Iterator iterator = result.keySet().iterator(); + Iterator iterator = result.keys().iterator(); try { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java index f4efd64dbb..d1e90e7bcd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java @@ -54,7 +54,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception @@ -85,11 +85,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener private FieldTable load() throws IOException { FieldTable result = FieldTableFactory.newFieldTable(); - result.put("one", 1L); - result.put("two", 2L); - result.put("three", 3L); - result.put("four", 4L); - result.put("five", 5L); + result.setLong("one", 1L); + result.setLong("two", 2L); + result.setLong("three", 3L); + result.setLong("four", 4L); + result.setLong("five", 5L); return result; } @@ -133,10 +133,9 @@ public class FieldTableMessageTest extends TestCase implements MessageListener { ByteBuffer buffer = ((JMSBytesMessage) m).getData(); FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining()); - for (Object o : _expected.keySet()) - { - String key = (String) o; - assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key)); + for (String key : _expected.keys()) + { + assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key)); } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java index bc2def1c64..29770704c5 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java @@ -56,7 +56,7 @@ public class MapMessageTest extends TestCase implements MessageListener try { TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { @@ -144,13 +144,29 @@ public class MapMessageTest extends TestCase implements MessageListener } - void waitFor(int count) throws InterruptedException + void waitFor(int count) throws Exception { + long waitTime = 30000L; + long waitUntilTime = System.currentTimeMillis() + 30000L; + + synchronized(received) { - while (received.size() < count) + while(received.size() < count && waitTime>0) + { + if (received.size() < count) + { + received.wait(waitTime); + } + + if (received.size() < count) + { + waitTime = waitUntilTime - System.currentTimeMillis(); + } + } + if (received.size() < count) { - received.wait(); + throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size()); } } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java index 1e9de221d4..66d82a991e 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java @@ -41,7 +41,7 @@ public class MultipleConnectionTest extends TestCase Receiver(String broker, AMQDestination dest, int sessions) throws Exception { - this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions); + this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest, sessions); } Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception @@ -72,7 +72,7 @@ public class MultipleConnectionTest extends TestCase Publisher(String broker, AMQDestination dest) throws Exception { - this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest); + this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest); } Publisher(AMQConnection connection, AMQDestination dest) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java index 3f726ae5ab..dc1aadaa6c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java @@ -50,7 +50,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener TransportConnection.createVMBroker(1); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index 17679788bd..d0126e1917 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -29,12 +29,7 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.testutil.VMBrokerSetup; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; +import javax.jms.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -58,7 +53,7 @@ public class PropertyValueTest extends TestCase implements MessageListener super.setUp(); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { @@ -81,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener { _connection = connection; _destination = destination; - _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set up a slow consumer _session.createConsumer(destination).setMessageListener(this); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java new file mode 100644 index 0000000000..c54ea8c6e6 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.basic; + +import junit.framework.TestCase; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.BasicMessageProducer; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class PubSubTwoConnectionRefTest extends TestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + /** + * This tests that a consumer is set up synchronously + * @throws Exception + */ + public void testTwoConnections() throws Exception + { + AMQTopic topic = new AMQTopic("MyTopic"); + AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); + AMQSession session1 = con1.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE); + BasicMessageProducer producer = session1.createBasicProducer(topic); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); + Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(topic); + con2.start(); + producer.sendRef(session1.createTextMessage("Hello ref")); +// producer.sendRef(session1.createTextMessage("Goodbye ref")); + TextMessage tm1 = (TextMessage) consumer.receive(2000); + assertNotNull(tm1); + assertEquals("Hello ref", tm1.getText()); +// assertEquals("Goodbye ref", tm1.getText()); + } + + public static void main(String[] args){ + PubSubTwoConnectionRefTest test = new PubSubTwoConnectionRefTest(); + try { + test.setUp(); + test.testTwoConnections(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index b853963c96..937944e340 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -51,11 +51,11 @@ public class PubSubTwoConnectionTest extends TestCase public void testTwoConnections() throws Exception { Topic topic = new AMQTopic("MyTopic"); - Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path"); + Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test"); Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageProducer producer = session1.createProducer(topic); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test"); Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer = session2.createConsumer(topic); con2.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java index 302551b05c..1db62cffa9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java @@ -48,7 +48,7 @@ public class ReceiveTest extends TestCase { createVMBroker(); String broker = _connectionString; - init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path")); + init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "test")); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index 27a2ccb32e..fe15e151a3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -51,7 +51,7 @@ public class SelectorTest extends TestCase implements MessageListener { super.setUp(); TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java index 726c7e39d7..cce02accd8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java @@ -43,7 +43,7 @@ public class SessionStartTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java index 903f6a9da9..b50cd39780 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java @@ -52,7 +52,7 @@ public class TextMessageTest extends TestCase implements MessageListener super.setUp(); try { - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path")); + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } catch (Exception e) { @@ -75,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener { _connection = connection; _destination = destination; - _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); + _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //set up a slow consumer _session.createConsumer(destination).setMessageListener(this); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java index 68bdc6ddf2..db4e18a4a1 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java @@ -24,7 +24,7 @@ import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.*; @@ -41,13 +41,15 @@ public class AMQConnectionTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test"); + TransportConnection.createVMBroker(1); + _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); _topic = new AMQTopic("mytopic"); _queue = new AMQQueue("myqueue"); } protected void tearDown() throws Exception { + super.tearDown(); try { _connection.close(); @@ -55,8 +57,8 @@ public class AMQConnectionTest extends TestCase catch (JMSException e) { //ignore - } - super.tearDown(); + } + TransportConnection.killAllVMBrokers(); } /** @@ -195,6 +197,6 @@ public class AMQConnectionTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(AMQConnectionTest.class)); + return new junit.framework.TestSuite(AMQConnectionTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java index 67c4f1dd6b..b01a129bf2 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java @@ -46,7 +46,7 @@ public class AMQSessionTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test"); + _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test"); _topic = new AMQTopic("mytopic"); _queue = new AMQQueue("myqueue"); _session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index ac789eb915..05d83be47f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -20,17 +20,17 @@ */ package org.apache.qpid.test.unit.client.channelclose; +import junit.framework.TestCase; +import junit.textui.TestRunner; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.testutil.VMBrokerSetup; -import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.*; import java.util.ArrayList; import java.util.List; -import junit.framework.TestCase; -import junit.textui.TestRunner; /** * Due to bizarre exception handling all sessions are closed if you get @@ -64,7 +64,8 @@ public class ChannelCloseOkTest extends TestCase { super.setUp(); - _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"); + TransportConnection.createVMBroker(1); + _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); _destination1 = new AMQQueue("q1", true); _destination2 = new AMQQueue("q2", true); @@ -192,7 +193,15 @@ public class ChannelCloseOkTest extends TestCase { while (received.size() < count) { - received.wait(); + try + { + received.wait(); + } + catch (InterruptedException e) + { + _log.info("Interrupted: " + e); + throw e; + } } } } @@ -209,6 +218,6 @@ public class ChannelCloseOkTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(ChannelCloseOkTest.class)); + return new junit.framework.TestSuite(ChannelCloseOkTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java index 0b3ed931f8..7a665daeb3 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -49,7 +49,7 @@ public class CloseWithBlockingReceiveTest extends TestCase public void testReceiveReturnsNull() throws Exception { final Connection connection = new AMQConnection("vm://:1", "guest", "guest", - "fred", "/test"); + "fred", "test"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(new AMQTopic("banana")); connection.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index c7fc3efc87..2ee9fad5d4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionFailureException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.AMQConnectionFailureException; import javax.jms.Connection; @@ -47,14 +48,15 @@ public class ConnectionTest extends TestCase protected void tearDown() throws Exception { - TransportConnection.killAllVMBrokers(); + TransportConnection.killVMBroker(1); } public void testSimpleConnection() { try { - new AMQConnection(_broker, "guest", "guest", "fred", "/test"); + AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); + conn.close(); } catch (Exception e) { @@ -93,6 +95,7 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); } } + } public void testUnresolvedHostFailure() throws Exception @@ -114,7 +117,7 @@ public class ConnectionTest extends TestCase public void testClientIdCannotBeChanged() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", - "fred", "/test"); + "fred", "test"); try { connection.setClientID("someClientId"); @@ -129,7 +132,7 @@ public class ConnectionTest extends TestCase public void testClientIdIsPopulatedAutomatically() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", - null, "/test"); + null, "test"); assertNotNull(connection.getClientID()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 147d2ae43e..a23c78822f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -33,14 +33,14 @@ public class ConnectionURLTest extends TestCase public void testFailoverURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 2); @@ -60,14 +60,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportUsernamePasswordURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -80,14 +80,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportUsernameBlankPasswordURL() throws URLSyntaxException { - String url = "amqp://ritchiem:@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem:@/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -100,7 +100,7 @@ public class ConnectionURLTest extends TestCase public void testFailedURLNullPassword() { - String url = "amqp://ritchiem@/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://ritchiem@/test?brokerlist='tcp://localhost:5672'"; try { @@ -140,7 +140,7 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportWithClientURLURL() throws URLSyntaxException { - String url = "amqp://guest:guest@clientname/temp?brokerlist='tcp://localhost:5672'"; + String url = "amqp://guest:guest@clientname/test?brokerlist='tcp://localhost:5672'"; ConnectionURL connectionurl = new AMQConnectionURL(url); @@ -148,7 +148,7 @@ public class ConnectionURLTest extends TestCase assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getClientName().equals("clientname")); @@ -164,14 +164,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransport1OptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -187,14 +187,14 @@ public class ConnectionURLTest extends TestCase public void testSingleTransportDefaultedBroker() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='localhost'"; + String url = "amqp://guest:guest@/test?brokerlist='localhost'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -207,17 +207,83 @@ public class ConnectionURLTest extends TestCase assertTrue(service.getPort() == 5672); } + public void testSingleTransportDefaultedBrokerWithPort() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='localhost:1234'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + + assertTrue(connectionurl.getBrokerCount() == 1); + + BrokerDetails service = connectionurl.getBrokerDetails(0); + + assertTrue(service.getTransport().equals("tcp")); + + assertTrue(service.getHost().equals("localhost")); + assertTrue(service.getPort() == 1234); + } + + public void testSingleTransportDefaultedBrokerWithIP() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1'"; + + ConnectionURL connectionurl = new AMQConnectionURL(url); + + assertTrue(connectionurl.getFailoverMethod() == null); + assertTrue(connectionurl.getUsername().equals("guest")); + assertTrue(connectionurl.getPassword().equals("guest")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); + + + assertTrue(connectionurl.getBrokerCount() == 1); + + BrokerDetails service = connectionurl.getBrokerDetails(0); + + assertTrue(service.getTransport().equals("tcp")); + + assertTrue(service.getHost().equals("127.0.0.1")); + assertTrue(service.getPort() == 5672); + } + + public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException + { + String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'"; + +// ConnectionURL connectionurl = new AMQConnectionURL(url); +// +// assertTrue(connectionurl.getFailoverMethod() == null); +// assertTrue(connectionurl.getUsername().equals("guest")); +// assertTrue(connectionurl.getPassword().equals("guest")); +// assertTrue(connectionurl.getVirtualHost().equals("/temp")); +// +// +// assertTrue(connectionurl.getBrokerCount() == 1); +// +// BrokerDetails service = connectionurl.getBrokerDetails(0); +// +// assertTrue(service.getTransport().equals("tcp")); +// +// assertTrue(service.getHost().equals("127.0.0.1")); +// assertTrue(service.getPort() == 1234); + } + public void testSingleTransportMultiOptionURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; + String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -235,14 +301,14 @@ public class ConnectionURLTest extends TestCase public void testSinglevmURL() throws URLSyntaxException { - String url = "amqp://guest:guest@/messages?brokerlist='vm://:2'"; + String url = "amqp://guest:guest@/test?brokerlist='vm://:2'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod() == null); assertTrue(connectionurl.getUsername().equals("guest")); assertTrue(connectionurl.getPassword().equals("guest")); - assertTrue(connectionurl.getVirtualHost().equals("/messages")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 1); @@ -256,14 +322,14 @@ public class ConnectionURLTest extends TestCase public void testFailoverVMURL() throws URLSyntaxException { - String url = "amqp://ritchiem:bob@/temp?brokerlist='vm://:2;vm://:3',failover='roundrobin'"; + String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'"; ConnectionURL connectionurl = new AMQConnectionURL(url); assertTrue(connectionurl.getFailoverMethod().equals("roundrobin")); assertTrue(connectionurl.getUsername().equals("ritchiem")); assertTrue(connectionurl.getPassword().equals("bob")); - assertTrue(connectionurl.getVirtualHost().equals("/temp")); + assertTrue(connectionurl.getVirtualHost().equals("/test")); assertTrue(connectionurl.getBrokerCount() == 2); @@ -309,7 +375,6 @@ public class ConnectionURLTest extends TestCase } - public void testWrongOptionSeparatorInOptions() { String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672;tcp://localhost:5673'+failover='roundrobin'"; diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index 6c2c684362..db0d3e0eab 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -65,29 +65,35 @@ public class Client implements MessageListener _connection.close(); } - public void onMessage(Message response) + public synchronized void onMessage(Message response) { + System.out.println("Received " + (++_count) + " of " + _expected + " responses."); if(_count == _expected) { - synchronized(this) - { - notifyAll(); - } + + notifyAll(); } + + } - synchronized void waitUntilComplete() throws InterruptedException + synchronized void waitUntilComplete() throws Exception { - while(_count < _expected) + + if(_count < _expected) + { + wait(10000L); + } + if(_count < _expected) { - wait(); + throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); } } static AMQConnection connect(String broker) throws Exception { - return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path"); + return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); } public static void main(String[] argv) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java index a1c64e2246..58f9c6fc19 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java @@ -73,7 +73,7 @@ public class Service implements MessageListener static AMQConnection connect(String broker) throws Exception { - return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path"); + return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); } // public static void main(String[] argv) throws Exception diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java index 22015dbc93..691acbb213 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.unit.client.forwardall; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; /** * Queue that allows several private queues to be registered and bound @@ -29,15 +30,19 @@ import org.apache.qpid.client.AMQQueue; */ class SpecialQueue extends AMQQueue { - private final String name; + private final AMQShortString name; SpecialQueue(String name) { + this(new AMQShortString(name)); + } + SpecialQueue(AMQShortString name) + { super(name, true); this.name = name; } - public String getRoutingKey() + public AMQShortString getRoutingKey() { return name; } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java index bbd1870168..0e4603ed24 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java @@ -54,7 +54,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener protected void setUp() throws Exception { super.setUp(); - connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path"); + connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test"); destination = new AMQQueue(randomize("LatencyTest"), true); session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -101,6 +101,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener } catch (Exception e) { + e.printStackTrace(); fail("This Test should succeed but failed due to: " + e); } finally @@ -236,7 +237,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener public void onMessage(Message message) { - received++; + try { if (message instanceof ObjectMessage) @@ -255,13 +256,11 @@ public class ObjectMessageTest extends TestCase implements MessageListener items.add(e); } - if (waiting) - { synchronized(this) { + received++; notify(); } - } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index eee9b2de9f..64898a1b9a 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.client.protocol; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.framing.AMQShortString; import org.apache.mina.common.IoSession; import junit.framework.TestCase; @@ -45,7 +46,7 @@ public class AMQProtocolSessionTest extends TestCase return (TestIoSession) _minaProtocolSession; } - public String genQueueName() + public AMQShortString genQueueName() { return generateQueueName(); } @@ -80,26 +81,26 @@ public class AMQProtocolSessionTest extends TestCase public void testGenerateQueueName() { - String testAddress; + AMQShortString testAddress; - //test address with / and ; chars which generateQueueName should remove + //test address with / and ; chars which generateQueueName should removeKey _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress); _testSession.getMinaProtocolSession().setLocalPort(_port); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress); + assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString()); //test empty address _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress); + assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString()); //test address with no special chars _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress); testAddress = _testSession.genQueueName(); - assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress); + assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString()); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 6c064e3853..b6c539d91c 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -28,7 +28,7 @@ public class TemporaryQueueTest extends TestCase protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java index c9240e9be7..7cbd4e8bdd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java @@ -51,7 +51,7 @@ public class TopicPublisherCloseTest extends TestCase public void testAllMethodsThrowAfterConnectionClose() throws Exception { - AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path"); + AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test"); Topic destination1 = new AMQTopic("t1"); TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index c14b5317c7..1f53d7de65 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -11,6 +11,7 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -39,12 +40,12 @@ public class JMSDestinationTest extends TestCase public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java new file mode 100644 index 0000000000..c09d2504eb --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import junit.framework.TestCase; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.message.TestNonQpidTextMessage; +import org.apache.qpid.framing.AMQShortString; + +import javax.jms.*; + +/** + * @author Apache Software Foundation + */ +public class JMSPropertiesTest extends TestCase +{ + + private static final Logger _logger = Logger.getLogger(JMSPropertiesTest.class); + + public String _connectionString = "vm://:1"; + + public static final String JMS_CORR_ID = "QPIDID_01"; + public static final int JMS_DELIV_MODE = 1; + public static final String JMS_TYPE = "test.jms.type"; + public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + public void testJMSProperties() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + //create a test message to send + ObjectMessage sentMsg = new TestNonQpidTextMessage(); + sentMsg.setJMSCorrelationID(JMS_CORR_ID); + sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE); + sentMsg.setJMSType(JMS_TYPE); + sentMsg.setJMSReplyTo(JMS_REPLY_TO); + + //send it + producer.send(sentMsg); + + con2.close(); + + con.start(); + + //get message and check JMS properties + ObjectMessage rm = (ObjectMessage) consumer.receive(); + assertNotNull(rm); + + assertEquals("JMS Correlation ID mismatch",sentMsg.getJMSCorrelationID(),rm.getJMSCorrelationID()); + //TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion + //assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch",sentMsg.getJMSType(),rm.getJMSType()); + assertEquals("JMS Reply To mismatch",sentMsg.getJMSReplyTo(),rm.getJMSReplyTo()); + + con.close(); + } + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java new file mode 100644 index 0000000000..6a335b8627 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.message; + +import junit.framework.TestCase; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.message.JMSTextMessage; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.JMSMapMessage; +import org.apache.qpid.client.AMQQueue; + +import javax.jms.Message; +import javax.jms.Destination; +import javax.jms.TextMessage; +import javax.jms.MapMessage; +import java.util.HashMap; + + +public class MessageConverterTest extends TestCase { + + public static final String JMS_CORR_ID = "QPIDID_01"; + public static final int JMS_DELIV_MODE = 1; + public static final String JMS_TYPE = "test.jms.type"; + public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto"); + + protected JMSTextMessage testTextMessage; + + protected JMSMapMessage testMapMessage; + + protected void setUp() throws Exception + { + super.setUp(); + testTextMessage = new JMSTextMessage(); + + //Add JMSProperties + testTextMessage.setJMSCorrelationID(JMS_CORR_ID); + testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE); + testTextMessage.setJMSType(JMS_TYPE); + testTextMessage.setJMSReplyTo(JMS_REPLY_TO); + testTextMessage.setText("testTextMessage text"); + + //Add non-JMS properties + testTextMessage.setStringProperty("testProp1","testValue1"); + testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE); + + testMapMessage = new JMSMapMessage(); + testMapMessage.setString("testMapString","testMapStringValue"); + testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE); + } + + public void testSetProperties() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); + + //check JMS prop values on newMessage match + assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID()); + assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode()); + assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType()); + assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo()); + + //check non-JMS standard props ok too + assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"), + newMessage.getStringProperty("testProp1")); + assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"), + newMessage.getDoubleProperty("testProp2")); + } + + public void testJMSTextMessageConversion() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage(); + assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText()); + } + + public void testJMSMapMessageConversion() throws Exception + { + AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage(); + assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"), + testMapMessage.getString("testMapString")); + assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"), + testMapMessage.getDouble("testMapDouble")); + + } + + protected void tearDown() throws Exception + { + super.tearDown(); + testTextMessage = null; + } + + +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 184d7cb015..7d83d19d74 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -1,11 +1,23 @@ -/** - * User: Robert Greig - * Date: 12-Dec-2006 - ****************************************************************************** - * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of - * this program may be photocopied reproduced or translated to another - * program language without prior written consent of JP Morgan Chase Ltd - ******************************************************************************/ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.test.unit.message; import junit.framework.TestCase; @@ -47,7 +59,7 @@ public class StreamMessageTest extends TestCase public void testStreamMessageEOF() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -60,7 +72,7 @@ public class StreamMessageTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -101,7 +113,7 @@ public class StreamMessageTest extends TestCase public void testModifyReceivedMessageExpandsBuffer() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue = new AMQQueue("testQ"); MessageConsumer consumer = consumerSession.createConsumer(queue); @@ -123,7 +135,7 @@ public class StreamMessageTest extends TestCase } } }); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer mandatoryProducer = producerSession.createProducer(queue); con.start(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index 5ded8aaeef..7e645f1a26 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -25,7 +25,7 @@ import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.client.transport.TransportConnection; import javax.jms.JMSException; import javax.jms.Message; @@ -39,10 +39,23 @@ import junit.framework.TestCase; public class DurableSubscriptionTest extends TestCase { + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); @@ -83,7 +96,7 @@ public class DurableSubscriptionTest extends TestCase public void testDurability() throws AMQException, JMSException, URLSyntaxException { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(topic); MessageProducer producer = session1.createProducer(topic); @@ -128,6 +141,6 @@ public class DurableSubscriptionTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(DurableSubscriptionTest.class)); + return new junit.framework.TestSuite(DurableSubscriptionTest.class); } } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java index 4ffb3e8459..c4acf15a58 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java @@ -48,7 +48,7 @@ public class TopicPublisherTest extends TestCase public void testUnidentifiedProducer() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(null); MessageConsumer consumer1 = session1.createConsumer(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 794316d2f5..8e883a2184 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -51,7 +51,7 @@ public class TopicSessionTest extends TestCase public void testTopicSubscriptionUnsubscription() throws Exception { AMQTopic topic = new AMQTopic("MyTopic"); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0"); TopicPublisher publisher = session1.createPublisher(topic); @@ -97,7 +97,7 @@ public class TopicSessionTest extends TestCase { AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown)); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); @@ -112,7 +112,7 @@ public class TopicSessionTest extends TestCase { session1.close(); con.close(); - con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); con.start(); session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); publisher = session1.createPublisher(null); @@ -134,11 +134,11 @@ public class TopicSessionTest extends TestCase public void testUnsubscriptionAfterConnectionClose() throws Exception { AMQTopic topic = new AMQTopic("MyTopic3"); - AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); - AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test"); TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); @@ -149,7 +149,7 @@ public class TopicSessionTest extends TestCase assertNotNull(tm); con2.close(); publisher.publish(session1.createTextMessage("Hello2")); - con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test"); + con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test"); session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); @@ -163,14 +163,14 @@ public class TopicSessionTest extends TestCase public void testTextMessageCreation() throws Exception { AMQTopic topic = new AMQTopic("MyTopic4"); - AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); MessageConsumer consumer1 = session1.createConsumer(topic); con.start(); TextMessage tm = session1.createTextMessage("Hello"); publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); + tm = (TextMessage) consumer1.receive(200000L); assertNotNull(tm); String msgText = tm.getText(); assertEquals("Hello", msgText); @@ -178,7 +178,7 @@ public class TopicSessionTest extends TestCase msgText = tm.getText(); assertNull(msgText); publisher.publish(tm); - tm = (TextMessage) consumer1.receive(2000); + tm = (TextMessage) consumer1.receive(20000000L); assertNotNull(tm); msgText = tm.getText(); assertNull(msgText); @@ -202,7 +202,7 @@ public class TopicSessionTest extends TestCase public void testSendingSameMessage() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); @@ -219,12 +219,13 @@ public class TopicSessionTest extends TestCase assertNotNull(receivedMessage); assertEquals(sentMessage.getText(),receivedMessage.getText()); + conn.close(); } public void testTemporaryTopic() throws Exception { - AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test"); + AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test"); TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic topic = session.createTemporaryTopic(); assertNotNull(topic); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 3e1fc04626..ce6df83baf 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -23,8 +23,11 @@ package org.apache.qpid.test.unit.transacted; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.mina.util.SessionLog; import org.apache.log4j.Logger; import javax.jms.*; @@ -54,10 +57,11 @@ public class TransactedTest extends TestCase protected void setUp() throws Exception { super.setUp(); - queue1 = new AMQQueue("Q1", "Q1", false, true); + TransportConnection.createVMBroker(1); + queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); queue2 = new AMQQueue("Q2", false); - con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test"); + con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test"); session = con.createSession(true, 0); consumer1 = session.createConsumer(queue1); //Dummy just to create the queue. @@ -66,16 +70,26 @@ public class TransactedTest extends TestCase producer2 = session.createProducer(queue2); con.start(); - prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test"); + prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test"); prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); prepProducer1 = prepSession.createProducer(queue1); prepCon.start(); + +// //add some messages +// prepProducer1.send(prepSession.createTextMessage("A")); +// prepProducer1.send(prepSession.createTextMessage("B")); +// prepProducer1.send(prepSession.createTextMessage("C")); +// +// testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); +// testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); +// testConsumer2 = testSession.createConsumer(queue2); } protected void tearDown() throws Exception { con.close(); prepCon.close(); + TransportConnection.killAllVMBrokers(); super.tearDown(); } @@ -96,9 +110,9 @@ public class TransactedTest extends TestCase //commit session.commit(); - + testCon.start(); //ensure sent messages can be received and received messages are gone - testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer1 = testSession.createConsumer(queue1); testConsumer2 = testSession.createConsumer(queue2); @@ -108,6 +122,7 @@ public class TransactedTest extends TestCase expect("Y", testConsumer2.receive(1000)); expect("Z", testConsumer2.receive(1000)); + testConsumer1 = testSession.createConsumer(queue1); assertTrue(null == testConsumer1.receive(1000)); assertTrue(null == testConsumer2.receive(1000)); testCon.close(); @@ -141,11 +156,12 @@ public class TransactedTest extends TestCase expect("A", consumer1.receive(1000)); expect("B", consumer1.receive(1000)); expect("C", consumer1.receive(1000)); - + testCon.start(); + testConsumer1 = testSession.createConsumer(queue1); //commit session.commit(); - testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test"); + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test"); testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); testConsumer1 = testSession.createConsumer(queue1); testConsumer2 = testSession.createConsumer(queue2); @@ -164,7 +180,7 @@ public class TransactedTest extends TestCase public void testResendsMsgsAfterSessionClose() throws Exception { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test"); Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue3 = new AMQQueue("Q3", false); @@ -172,7 +188,7 @@ public class TransactedTest extends TestCase //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test"); Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue3); @@ -234,6 +250,7 @@ public class TransactedTest extends TestCase con.close(); con2.close(); + } // This checks that queue Q1 is in fact empty and does not have any stray @@ -251,6 +268,6 @@ public class TransactedTest extends TestCase public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class)); + return new junit.framework.TestSuite(TransactedTest.class); } } |
