summaryrefslogtreecommitdiff
path: root/java/client/src/test
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
commita22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch)
tree5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/client/src/test
parent9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff)
downloadqpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/test')
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java213
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java164
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java231
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java106
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java19
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java81
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java12
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java23
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java11
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java105
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java24
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java13
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java100
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java109
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java36
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java21
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java23
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java37
38 files changed, 1274 insertions, 164 deletions
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
new file mode 100644
index 0000000000..6b03dd32e8
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerMultiConsumerTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerMultiConsumerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 6;
+ private int receivedCount1 = 0;
+ private int receivedCount2 = 0;
+ private Connection _clientConnection;
+ private MessageConsumer _consumer1;
+ private MessageConsumer _consumer2;
+
+ private boolean _testAsync;
+ private final CountDownLatch _allMessagesSent = new CountDownLatch(2); //all messages Sent Lock
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer1 = clientSession1.createConsumer(queue);
+
+ //Create Client 2
+ Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer2 = clientSession2.createConsumer(queue);
+
+ //Create Producer
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount1 + receivedCount2);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testRecieveC1thenC2() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+
+ assertTrue(_consumer1.receive() != null);
+ }
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+ public void testRecieveInterleaved() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT / 2; msg++)
+ {
+ assertTrue(_consumer1.receive() != null);
+ assertTrue(_consumer2.receive() != null);
+ }
+ }
+
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer1.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message);
+
+ receivedCount1++;
+
+ if (receivedCount1 == MSG_COUNT / 2)
+ {
+ _allMessagesSent.countDown();
+ }
+
+ }
+ });
+
+ _consumer2.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message);
+
+ receivedCount2++;
+ if (receivedCount2 == MSG_COUNT / 2)
+ {
+ _allMessagesSent.countDown();
+ }
+ }
+ });
+
+
+ _logger.info("Waiting upto 2 seconds for messages");
+
+ try
+ {
+ _allMessagesSent.await(2000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
new file mode 100644
index 0000000000..0739acfabd
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.ConnectionFactory;
+import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+/**
+ * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue
+ * <p/>
+ * The message delivery process:
+ * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s
+ * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start
+ * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a
+ * session can run in any order and a synchronous put/poll will block the dispatcher).
+ * <p/>
+ * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered
+ * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first.
+ */
+public class MessageListenerTest extends TestCase implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(MessageListenerTest.class);
+
+ Context _context;
+
+ private static final int MSG_COUNT = 5;
+ private int receivedCount = 0;
+ private MessageConsumer _consumer;
+ private Connection _clientConnection;
+ private boolean _testAsync;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://client:client@MLT_ID/test?brokerlist='vm://:1'");
+ env.put("queue.queue", "direct://amq.direct//MessageListenerTest");
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ _consumer = clientSession.createConsumer(queue);
+
+ //Create Producer
+
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ producer.send(producerSession.createTextMessage("Message " + msg));
+ }
+
+ producerConnection.close();
+
+ _testAsync = false;
+ }
+
+ protected void tearDown() throws Exception
+ {
+ //Should have recieved all async messages
+ if (_testAsync)
+ {
+ assertEquals(MSG_COUNT, receivedCount);
+ }
+ _clientConnection.close();
+
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testSynchronousRecieve() throws Exception
+ {
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(_consumer.receive(2000) != null);
+ }
+ }
+
+ public void testAsynchronousRecieve() throws Exception
+ {
+ _testAsync = true;
+
+ _consumer.setMessageListener(this);
+
+
+ _logger.info("Waiting 3 seconds for messages");
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //do nothing
+ }
+
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.info("Received Message(" + receivedCount + "):" + message);
+
+ receivedCount++;
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(MessageListenerTest.class);
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java b/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
new file mode 100644
index 0000000000..f7bea1b36a
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/client/message/TestNonQpidTextMessage.java
@@ -0,0 +1,231 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.*;
+import java.util.Enumeration;
+import java.io.Serializable;
+
+public class TestNonQpidTextMessage implements ObjectMessage {
+
+ private JMSObjectMessage _realMessage;
+ private String _contentString;
+
+ /**
+ * Allows us to construct a JMS message which
+ * does not inherit from the Qpid message superclasses
+ * and expand our unit testing of MessageConverter et al
+ */
+ public TestNonQpidTextMessage()
+ {
+ _realMessage = new JMSObjectMessage();
+ }
+
+ public String getJMSMessageID() throws JMSException {
+ return _realMessage.getJMSMessageID();
+ }
+
+ public void setJMSMessageID(String string) throws JMSException {
+ _realMessage.setJMSMessageID(string);
+ }
+
+ public long getJMSTimestamp() throws JMSException {
+ return _realMessage.getJMSTimestamp();
+ }
+
+ public void setJMSTimestamp(long l) throws JMSException {
+ _realMessage.setJMSTimestamp(l);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return _realMessage.getJMSCorrelationIDAsBytes();
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException {
+ _realMessage.setJMSCorrelationIDAsBytes(bytes);
+ }
+
+ public void setJMSCorrelationID(String string) throws JMSException {
+ _realMessage.setJMSCorrelationID(string);
+ }
+
+ public String getJMSCorrelationID() throws JMSException {
+ return _realMessage.getJMSCorrelationID();
+ }
+
+ public Destination getJMSReplyTo() throws JMSException {
+ return _realMessage.getJMSReplyTo();
+ }
+
+ public void setJMSReplyTo(Destination destination) throws JMSException {
+ _realMessage.setJMSReplyTo(destination);
+ }
+
+ public Destination getJMSDestination() throws JMSException {
+ return _realMessage.getJMSDestination();
+ }
+
+ public void setJMSDestination(Destination destination) throws JMSException {
+ _realMessage.setJMSDestination(destination);
+ }
+
+ public int getJMSDeliveryMode() throws JMSException {
+ return _realMessage.getJMSDeliveryMode();
+ }
+
+ public void setJMSDeliveryMode(int i) throws JMSException {
+ _realMessage.setJMSDeliveryMode(i);
+ }
+
+ public boolean getJMSRedelivered() throws JMSException {
+ return _realMessage.getJMSRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean b) throws JMSException {
+ _realMessage.setJMSRedelivered(b);
+ }
+
+ public String getJMSType() throws JMSException {
+ return _realMessage.getJMSType();
+ }
+
+ public void setJMSType(String string) throws JMSException {
+ _realMessage.setJMSType(string);
+ }
+
+ public long getJMSExpiration() throws JMSException {
+ return _realMessage.getJMSExpiration();
+ }
+
+ public void setJMSExpiration(long l) throws JMSException {
+ _realMessage.setJMSExpiration(l);
+ }
+
+ public int getJMSPriority() throws JMSException {
+ return _realMessage.getJMSPriority();
+ }
+
+ public void setJMSPriority(int i) throws JMSException {
+ _realMessage.setJMSPriority(i);
+ }
+
+ public void clearProperties() throws JMSException {
+ _realMessage.clearProperties();
+ }
+
+ public boolean propertyExists(String string) throws JMSException {
+ return _realMessage.propertyExists(string);
+ }
+
+ public boolean getBooleanProperty(String string) throws JMSException {
+ return _realMessage.getBooleanProperty(string);
+ }
+
+ public byte getByteProperty(String string) throws JMSException {
+ return _realMessage.getByteProperty(string);
+ }
+
+ public short getShortProperty(String string) throws JMSException {
+ return _realMessage.getShortProperty(string);
+ }
+
+ public int getIntProperty(String string) throws JMSException {
+ return _realMessage.getIntProperty(string);
+ }
+
+ public long getLongProperty(String string) throws JMSException {
+ return _realMessage.getLongProperty(string);
+ }
+
+ public float getFloatProperty(String string) throws JMSException {
+ return _realMessage.getFloatProperty(string);
+ }
+
+ public double getDoubleProperty(String string) throws JMSException {
+ return _realMessage.getDoubleProperty(string);
+ }
+
+ public String getStringProperty(String string) throws JMSException {
+ return _realMessage.getStringProperty(string);
+ }
+
+ public Object getObjectProperty(String string) throws JMSException {
+ return _realMessage.getObjectProperty(string);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException {
+ return _realMessage.getPropertyNames();
+ }
+
+ public void setBooleanProperty(String string, boolean b) throws JMSException {
+ _realMessage.setBooleanProperty(string,b);
+ }
+
+ public void setByteProperty(String string, byte b) throws JMSException {
+ _realMessage.setByteProperty(string,b);
+ }
+
+ public void setShortProperty(String string, short i) throws JMSException {
+ _realMessage.setShortProperty(string,i);
+ }
+
+ public void setIntProperty(String string, int i) throws JMSException {
+ _realMessage.setIntProperty(string,i);
+ }
+
+ public void setLongProperty(String string, long l) throws JMSException {
+ _realMessage.setLongProperty(string,l);
+ }
+
+ public void setFloatProperty(String string, float v) throws JMSException {
+ _realMessage.setFloatProperty(string,v);
+ }
+
+ public void setDoubleProperty(String string, double v) throws JMSException {
+ _realMessage.setDoubleProperty(string,v);
+ }
+
+ public void setStringProperty(String string, String string1) throws JMSException {
+ _realMessage.setStringProperty(string,string1);
+ }
+
+ public void setObjectProperty(String string, Object object) throws JMSException {
+ _realMessage.setObjectProperty(string,object);
+ }
+
+ public void acknowledge() throws JMSException {
+ _realMessage.acknowledge();
+ }
+
+ public void clearBody() throws JMSException {
+ _realMessage.clearBody();
+ }
+
+ public void setObject(Serializable serializable) throws JMSException {
+ if (serializable instanceof String)
+ {
+ _contentString = (String)serializable;
+ }
+ }
+
+ public Serializable getObject() throws JMSException {
+ return _contentString; }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index d12ab01bdc..4a8c0145c4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -25,37 +25,46 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
import javax.jms.*;
+import java.util.concurrent.atomic.AtomicInteger;
public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
+ private Exception _error;
+ private AtomicInteger count;
+
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
+ _error = null;
+ count = new AtomicInteger();
}
protected void tearDown() throws Exception
{
super.tearDown();
TransportConnection.killAllVMBrokers();
+ count = null;
}
public void testRecoverResendsMsgs() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -104,15 +113,15 @@ public class RecoverTest extends TestCase
public void testRecoverResendsMsgsAckOnEarlier() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
+ ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
@@ -168,15 +177,15 @@ public class RecoverTest extends TestCase
public void testAcknowledgePerConsumer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("Q1", "Q1", false, true);
- Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ Queue queue2 = new AMQQueue(new AMQShortString("Q2"), new AMQShortString("Q2"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
MessageProducer producer2 = producerSession.createProducer(queue2);
@@ -207,41 +216,96 @@ public class RecoverTest extends TestCase
public void testRecoverInAutoAckListener() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("Q3"), new AMQShortString("Q3"), false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageProducer producer = consumerSession.createProducer(queue);
producer.send(consumerSession.createTextMessage("hello"));
- MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+
+ final Object lock = new Object();
+
consumer.setMessageListener(new MessageListener()
{
- private int count = 0;
+
+
public void onMessage(Message message)
{
try
{
- if (count++ == 0)
+ count.incrementAndGet();
+ if (count.get() == 1)
{
- assertFalse(message.getJMSRedelivered());
+ if(message.getJMSRedelivered())
+ {
+ setError(new Exception("Message marked as redilvered on what should be first delivery attempt"));
+ }
consumerSession.recover();
}
- else if (count++ == 1)
+ else if (count.get() == 2)
{
- assertTrue(message.getJMSRedelivered());
+ if(!message.getJMSRedelivered())
+ {
+ setError(new Exception("Message not marked as redilvered on what should be second delivery attempt"));
+ }
}
else
{
- fail("Message delivered too many times!");
+ System.err.println(message);
+ fail("Message delivered too many times!: " + count);
}
}
catch (JMSException e)
{
_logger.error("Error recovering session: " + e, e);
+ setError(e);
+ }
+ synchronized(lock)
+ {
+ lock.notify();
}
}
});
+
+ con.start();
+
+ long waitTime = 300000L;
+ long waitUntilTime = System.currentTimeMillis() + waitTime;
+
+ synchronized(lock)
+ {
+ while((count.get() <= 1) && (waitTime > 0))
+ {
+ lock.wait(waitTime);
+ if(count.get() <= 1)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ }
+
+ Thread.sleep(1000);
+
+ if(count.get() != 2)
+ {
+ System.err.println("Count != 2 : " + count);
+ }
+ assertTrue(count.get() == 2);
+
+ con.close();
+
+ if(_error != null)
+ {
+ throw _error;
+ }
+ }
+
+ private void setError(Exception e)
+ {
+ _error = e;
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
index 59be38f0dd..cf5b5c76e5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/BytesMessageTest.java
@@ -48,7 +48,7 @@ public class BytesMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
index ad180e3a89..fb347053c7 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
@@ -35,16 +35,20 @@ import junit.framework.TestCase;
public class FieldTableKeyEnumeratorTest extends TestCase
{
+ public void testTrue()
+ {
+
+ }
public void testKeyEnumeration()
{
FieldTable result = FieldTableFactory.newFieldTable();
- result.put("one", 1L);
- result.put("two", 2L);
- result.put("three", 3L);
- result.put("four", 4L);
- result.put("five", 5L);
+ result.setObject("one", 1L);
+ result.setObject("two", 2L);
+ result.setObject("three", 3L);
+ result.setObject("four", 4L);
+ result.setObject("five", 5L);
- Iterator iterator = result.keySet().iterator();
+ Iterator iterator = result.keys().iterator();
try
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
index f4efd64dbb..d1e90e7bcd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
@@ -54,7 +54,7 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
@@ -85,11 +85,11 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
private FieldTable load() throws IOException
{
FieldTable result = FieldTableFactory.newFieldTable();
- result.put("one", 1L);
- result.put("two", 2L);
- result.put("three", 3L);
- result.put("four", 4L);
- result.put("five", 5L);
+ result.setLong("one", 1L);
+ result.setLong("two", 2L);
+ result.setLong("three", 3L);
+ result.setLong("four", 4L);
+ result.setLong("five", 5L);
return result;
}
@@ -133,10 +133,9 @@ public class FieldTableMessageTest extends TestCase implements MessageListener
{
ByteBuffer buffer = ((JMSBytesMessage) m).getData();
FieldTable actual = FieldTableFactory.newFieldTable(buffer, buffer.remaining());
- for (Object o : _expected.keySet())
- {
- String key = (String) o;
- assertEquals("Values for " + key + " did not match", _expected.get(key), actual.get(key));
+ for (String key : _expected.keys())
+ {
+ assertEquals("Values for " + key + " did not match", _expected.getObject(key), actual.getObject(key));
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
index bc2def1c64..29770704c5 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
@@ -56,7 +56,7 @@ public class MapMessageTest extends TestCase implements MessageListener
try
{
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
@@ -144,13 +144,29 @@ public class MapMessageTest extends TestCase implements MessageListener
}
- void waitFor(int count) throws InterruptedException
+ void waitFor(int count) throws Exception
{
+ long waitTime = 30000L;
+ long waitUntilTime = System.currentTimeMillis() + 30000L;
+
+
synchronized(received)
{
- while (received.size() < count)
+ while(received.size() < count && waitTime>0)
+ {
+ if (received.size() < count)
+ {
+ received.wait(waitTime);
+ }
+
+ if (received.size() < count)
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+ if (received.size() < count)
{
- received.wait();
+ throw new Exception("Timed-out. Waiting for " + count + " only got " + received.size());
}
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
index 1e9de221d4..66d82a991e 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/MultipleConnectionTest.java
@@ -41,7 +41,7 @@ public class MultipleConnectionTest extends TestCase
Receiver(String broker, AMQDestination dest, int sessions) throws Exception
{
- this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest, sessions);
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest, sessions);
}
Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception
@@ -72,7 +72,7 @@ public class MultipleConnectionTest extends TestCase
Publisher(String broker, AMQDestination dest) throws Exception
{
- this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "/test_path"), dest);
+ this(new AMQConnection(broker, "guest", "guest", randomize("Client"), "test"), dest);
}
Publisher(AMQConnection connection, AMQDestination dest) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
index 3f726ae5ab..dc1aadaa6c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ObjectMessageTest.java
@@ -50,7 +50,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
TransportConnection.createVMBroker(1);
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index 17679788bd..d0126e1917 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -29,12 +29,7 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.testutil.VMBrokerSetup;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
+import javax.jms.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -58,7 +53,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
super.setUp();
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
@@ -81,7 +76,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
{
_connection = connection;
_destination = destination;
- _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set up a slow consumer
_session.createConsumer(destination).setMessageListener(this);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
new file mode 100644
index 0000000000..c54ea8c6e6
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionRefTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.basic;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.BasicMessageProducer;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class PubSubTwoConnectionRefTest extends TestCase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ /**
+ * This tests that a consumer is set up synchronously
+ * @throws Exception
+ */
+ public void testTwoConnections() throws Exception
+ {
+ AMQTopic topic = new AMQTopic("MyTopic");
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
+ AMQSession session1 = con1.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE);
+ BasicMessageProducer producer = session1.createBasicProducer(topic);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
+ Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ MessageConsumer consumer = session2.createConsumer(topic);
+ con2.start();
+ producer.sendRef(session1.createTextMessage("Hello ref"));
+// producer.sendRef(session1.createTextMessage("Goodbye ref"));
+ TextMessage tm1 = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm1);
+ assertEquals("Hello ref", tm1.getText());
+// assertEquals("Goodbye ref", tm1.getText());
+ }
+
+ public static void main(String[] args){
+ PubSubTwoConnectionRefTest test = new PubSubTwoConnectionRefTest();
+ try {
+ test.setUp();
+ test.testTwoConnections();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
index b853963c96..937944e340 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java
@@ -51,11 +51,11 @@ public class PubSubTwoConnectionTest extends TestCase
public void testTwoConnections() throws Exception
{
Topic topic = new AMQTopic("MyTopic");
- Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "/test_path");
+ Connection con1 = new AMQConnection("vm://:1", "guest", "guest", "Client1", "test");
Session session1 = con1.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(topic);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "/test_path");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "Client2", "test");
Session session2 = con2.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer = session2.createConsumer(topic);
con2.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
index 302551b05c..1db62cffa9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/ReceiveTest.java
@@ -48,7 +48,7 @@ public class ReceiveTest extends TestCase
{
createVMBroker();
String broker = _connectionString;
- init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "/test_path"));
+ init(new AMQConnection(broker, "guest", "guest", "ReceiveTestClient", "test"));
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index 27a2ccb32e..fe15e151a3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -51,7 +51,7 @@ public class SelectorTest extends TestCase implements MessageListener
{
super.setUp();
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
index 726c7e39d7..cce02accd8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SessionStartTest.java
@@ -43,7 +43,7 @@ public class SessionStartTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
index 903f6a9da9..b50cd39780 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
@@ -52,7 +52,7 @@ public class TextMessageTest extends TestCase implements MessageListener
super.setUp();
try
{
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path"));
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
catch (Exception e)
{
@@ -75,7 +75,7 @@ public class TextMessageTest extends TestCase implements MessageListener
{
_connection = connection;
_destination = destination;
- _session = (AMQSession) connection.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
+ _session = (AMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//set up a slow consumer
_session.createConsumer(destination).setMessageListener(this);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
index 68bdc6ddf2..db4e18a4a1 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
@@ -24,7 +24,7 @@ import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.*;
@@ -41,13 +41,15 @@ public class AMQConnectionTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
+ TransportConnection.createVMBroker(1);
+ _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
}
protected void tearDown() throws Exception
{
+ super.tearDown();
try
{
_connection.close();
@@ -55,8 +57,8 @@ public class AMQConnectionTest extends TestCase
catch (JMSException e)
{
//ignore
- }
- super.tearDown();
+ }
+ TransportConnection.killAllVMBrokers();
}
/**
@@ -195,6 +197,6 @@ public class AMQConnectionTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(AMQConnectionTest.class));
+ return new junit.framework.TestSuite(AMQConnectionTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
index 67c4f1dd6b..b01a129bf2 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
@@ -46,7 +46,7 @@ public class AMQSessionTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
+ _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
_session = (AMQSession) _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index ac789eb915..05d83be47f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -20,17 +20,17 @@
*/
package org.apache.qpid.test.unit.client.channelclose;
+import junit.framework.TestCase;
+import junit.textui.TestRunner;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.*;
import java.util.ArrayList;
import java.util.List;
-import junit.framework.TestCase;
-import junit.textui.TestRunner;
/**
* Due to bizarre exception handling all sessions are closed if you get
@@ -64,7 +64,8 @@ public class ChannelCloseOkTest extends TestCase
{
super.setUp();
- _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path");
+ TransportConnection.createVMBroker(1);
+ _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test");
_destination1 = new AMQQueue("q1", true);
_destination2 = new AMQQueue("q2", true);
@@ -192,7 +193,15 @@ public class ChannelCloseOkTest extends TestCase
{
while (received.size() < count)
{
- received.wait();
+ try
+ {
+ received.wait();
+ }
+ catch (InterruptedException e)
+ {
+ _log.info("Interrupted: " + e);
+ throw e;
+ }
}
}
}
@@ -209,6 +218,6 @@ public class ChannelCloseOkTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ChannelCloseOkTest.class));
+ return new junit.framework.TestSuite(ChannelCloseOkTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
index 0b3ed931f8..7a665daeb3 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -49,7 +49,7 @@ public class CloseWithBlockingReceiveTest extends TestCase
public void testReceiveReturnsNull() throws Exception
{
final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
- "fred", "/test");
+ "fred", "test");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
connection.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index c7fc3efc87..2ee9fad5d4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.AMQConnectionFailureException;
import javax.jms.Connection;
@@ -47,14 +48,15 @@ public class ConnectionTest extends TestCase
protected void tearDown() throws Exception
{
- TransportConnection.killAllVMBrokers();
+ TransportConnection.killVMBroker(1);
}
public void testSimpleConnection()
{
try
{
- new AMQConnection(_broker, "guest", "guest", "fred", "/test");
+ AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
+ conn.close();
}
catch (Exception e)
{
@@ -93,6 +95,7 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
}
}
+
}
public void testUnresolvedHostFailure() throws Exception
@@ -114,7 +117,7 @@ public class ConnectionTest extends TestCase
public void testClientIdCannotBeChanged() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
try
{
connection.setClientID("someClientId");
@@ -129,7 +132,7 @@ public class ConnectionTest extends TestCase
public void testClientIdIsPopulatedAutomatically() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
- null, "/test");
+ null, "test");
assertNotNull(connection.getClientID());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 147d2ae43e..a23c78822f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -33,14 +33,14 @@ public class ConnectionURLTest extends TestCase
public void testFailoverURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 2);
@@ -60,14 +60,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportUsernamePasswordURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -80,14 +80,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportUsernameBlankPasswordURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem:@/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals(""));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -100,7 +100,7 @@ public class ConnectionURLTest extends TestCase
public void testFailedURLNullPassword()
{
- String url = "amqp://ritchiem@/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://ritchiem@/test?brokerlist='tcp://localhost:5672'";
try
{
@@ -140,7 +140,7 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportWithClientURLURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@clientname/temp?brokerlist='tcp://localhost:5672'";
+ String url = "amqp://guest:guest@clientname/test?brokerlist='tcp://localhost:5672'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
@@ -148,7 +148,7 @@ public class ConnectionURLTest extends TestCase
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getClientName().equals("clientname"));
@@ -164,14 +164,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransport1OptionURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim'";
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -187,14 +187,14 @@ public class ConnectionURLTest extends TestCase
public void testSingleTransportDefaultedBroker() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='localhost'";
+ String url = "amqp://guest:guest@/test?brokerlist='localhost'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -207,17 +207,83 @@ public class ConnectionURLTest extends TestCase
assertTrue(service.getPort() == 5672);
}
+ public void testSingleTransportDefaultedBrokerWithPort() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='localhost:1234'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+ assertTrue(service.getTransport().equals("tcp"));
+
+ assertTrue(service.getHost().equals("localhost"));
+ assertTrue(service.getPort() == 1234);
+ }
+
+ public void testSingleTransportDefaultedBrokerWithIP() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+ assertTrue(service.getTransport().equals("tcp"));
+
+ assertTrue(service.getHost().equals("127.0.0.1"));
+ assertTrue(service.getPort() == 5672);
+ }
+
+ public void testSingleTransportDefaultedBrokerWithIPandPort() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='127.0.0.1:1234'";
+
+// ConnectionURL connectionurl = new AMQConnectionURL(url);
+//
+// assertTrue(connectionurl.getFailoverMethod() == null);
+// assertTrue(connectionurl.getUsername().equals("guest"));
+// assertTrue(connectionurl.getPassword().equals("guest"));
+// assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+//
+//
+// assertTrue(connectionurl.getBrokerCount() == 1);
+//
+// BrokerDetails service = connectionurl.getBrokerDetails(0);
+//
+// assertTrue(service.getTransport().equals("tcp"));
+//
+// assertTrue(service.getHost().equals("127.0.0.1"));
+// assertTrue(service.getPort() == 1234);
+ }
+
public void testSingleTransportMultiOptionURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/temp?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
+ String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -235,14 +301,14 @@ public class ConnectionURLTest extends TestCase
public void testSinglevmURL() throws URLSyntaxException
{
- String url = "amqp://guest:guest@/messages?brokerlist='vm://:2'";
+ String url = "amqp://guest:guest@/test?brokerlist='vm://:2'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod() == null);
assertTrue(connectionurl.getUsername().equals("guest"));
assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/messages"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 1);
@@ -256,14 +322,14 @@ public class ConnectionURLTest extends TestCase
public void testFailoverVMURL() throws URLSyntaxException
{
- String url = "amqp://ritchiem:bob@/temp?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
+ String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
ConnectionURL connectionurl = new AMQConnectionURL(url);
assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
assertTrue(connectionurl.getUsername().equals("ritchiem"));
assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/temp"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
assertTrue(connectionurl.getBrokerCount() == 2);
@@ -309,7 +375,6 @@ public class ConnectionURLTest extends TestCase
}
-
public void testWrongOptionSeparatorInOptions()
{
String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672;tcp://localhost:5673'+failover='roundrobin'";
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index 6c2c684362..db0d3e0eab 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -65,29 +65,35 @@ public class Client implements MessageListener
_connection.close();
}
- public void onMessage(Message response)
+ public synchronized void onMessage(Message response)
{
+
System.out.println("Received " + (++_count) + " of " + _expected + " responses.");
if(_count == _expected)
{
- synchronized(this)
- {
- notifyAll();
- }
+
+ notifyAll();
}
+
+
}
- synchronized void waitUntilComplete() throws InterruptedException
+ synchronized void waitUntilComplete() throws Exception
{
- while(_count < _expected)
+
+ if(_count < _expected)
+ {
+ wait(10000L);
+ }
+ if(_count < _expected)
{
- wait();
+ throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected);
}
}
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
public static void main(String[] argv) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
index a1c64e2246..58f9c6fc19 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Service.java
@@ -73,7 +73,7 @@ public class Service implements MessageListener
static AMQConnection connect(String broker) throws Exception
{
- return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "/test_path");
+ return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test");
}
// public static void main(String[] argv) throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
index 22015dbc93..691acbb213 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/SpecialQueue.java
@@ -21,6 +21,7 @@
package org.apache.qpid.test.unit.client.forwardall;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
/**
* Queue that allows several private queues to be registered and bound
@@ -29,15 +30,19 @@ import org.apache.qpid.client.AMQQueue;
*/
class SpecialQueue extends AMQQueue
{
- private final String name;
+ private final AMQShortString name;
SpecialQueue(String name)
{
+ this(new AMQShortString(name));
+ }
+ SpecialQueue(AMQShortString name)
+ {
super(name, true);
this.name = name;
}
- public String getRoutingKey()
+ public AMQShortString getRoutingKey()
{
return name;
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
index bbd1870168..0e4603ed24 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
@@ -54,7 +54,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "/test_path");
+ connection = new AMQConnection(_broker, "guest", "guest", randomize("Client"), "test");
destination = new AMQQueue(randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -101,6 +101,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
}
catch (Exception e)
{
+ e.printStackTrace();
fail("This Test should succeed but failed due to: " + e);
}
finally
@@ -236,7 +237,7 @@ public class ObjectMessageTest extends TestCase implements MessageListener
public void onMessage(Message message)
{
- received++;
+
try
{
if (message instanceof ObjectMessage)
@@ -255,13 +256,11 @@ public class ObjectMessageTest extends TestCase implements MessageListener
items.add(e);
}
- if (waiting)
- {
synchronized(this)
{
+ received++;
notify();
}
- }
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index eee9b2de9f..64898a1b9a 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.test.unit.client.protocol;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.mina.common.IoSession;
import junit.framework.TestCase;
@@ -45,7 +46,7 @@ public class AMQProtocolSessionTest extends TestCase
return (TestIoSession) _minaProtocolSession;
}
- public String genQueueName()
+ public AMQShortString genQueueName()
{
return generateQueueName();
}
@@ -80,26 +81,26 @@ public class AMQProtocolSessionTest extends TestCase
public void testGenerateQueueName()
{
- String testAddress;
+ AMQShortString testAddress;
- //test address with / and ; chars which generateQueueName should remove
+ //test address with / and ; chars which generateQueueName should removeKey
_testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
_testSession.getMinaProtocolSession().setLocalPort(_port);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
_testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress);
+ assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
_testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
- assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress);
+ assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
index 6c064e3853..b6c539d91c 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -28,7 +28,7 @@ public class TemporaryQueueTest extends TestCase
protected Connection createConnection() throws AMQException, URLSyntaxException
{
return new AMQConnection(_broker, "guest", "guest",
- "fred", "/test");
+ "fred", "test");
}
public void testTempoaryQueue() throws Exception
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
index c9240e9be7..7cbd4e8bdd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/TopicPublisherCloseTest.java
@@ -51,7 +51,7 @@ public class TopicPublisherCloseTest extends TestCase
public void testAllMethodsThrowAfterConnectionClose() throws Exception
{
- AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "/test_path");
+ AMQConnection connection = new AMQConnection(_connectionString, "guest", "guest", "Client", "test");
Topic destination1 = new AMQTopic("t1");
TopicSession session1 = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
index c14b5317c7..1f53d7de65 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
@@ -11,6 +11,7 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
import javax.jms.*;
@@ -39,12 +40,12 @@ public class JMSDestinationTest extends TestCase
public void testJMSDestination() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
new file mode 100644
index 0000000000..c09d2504eb
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestNonQpidTextMessage;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.jms.*;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSPropertiesTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(JMSPropertiesTest.class);
+
+ public String _connectionString = "vm://:1";
+
+ public static final String JMS_CORR_ID = "QPIDID_01";
+ public static final int JMS_DELIV_MODE = 1;
+ public static final String JMS_TYPE = "test.jms.type";
+ public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+ public void testJMSProperties() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue(new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
+ Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ //create a test message to send
+ ObjectMessage sentMsg = new TestNonQpidTextMessage();
+ sentMsg.setJMSCorrelationID(JMS_CORR_ID);
+ sentMsg.setJMSDeliveryMode(JMS_DELIV_MODE);
+ sentMsg.setJMSType(JMS_TYPE);
+ sentMsg.setJMSReplyTo(JMS_REPLY_TO);
+
+ //send it
+ producer.send(sentMsg);
+
+ con2.close();
+
+ con.start();
+
+ //get message and check JMS properties
+ ObjectMessage rm = (ObjectMessage) consumer.receive();
+ assertNotNull(rm);
+
+ assertEquals("JMS Correlation ID mismatch",sentMsg.getJMSCorrelationID(),rm.getJMSCorrelationID());
+ //TODO: Commented out as always overwritten by send delivery mode value - prob should not set in conversion
+ //assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch",sentMsg.getJMSType(),rm.getJMSType());
+ assertEquals("JMS Reply To mismatch",sentMsg.getJMSReplyTo(),rm.getJMSReplyTo());
+
+ con.close();
+ }
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
new file mode 100644
index 0000000000..6a335b8627
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.message;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Message;
+import javax.jms.Destination;
+import javax.jms.TextMessage;
+import javax.jms.MapMessage;
+import java.util.HashMap;
+
+
+public class MessageConverterTest extends TestCase {
+
+ public static final String JMS_CORR_ID = "QPIDID_01";
+ public static final int JMS_DELIV_MODE = 1;
+ public static final String JMS_TYPE = "test.jms.type";
+ public static final Destination JMS_REPLY_TO = new AMQQueue("my.replyto");
+
+ protected JMSTextMessage testTextMessage;
+
+ protected JMSMapMessage testMapMessage;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ testTextMessage = new JMSTextMessage();
+
+ //Add JMSProperties
+ testTextMessage.setJMSCorrelationID(JMS_CORR_ID);
+ testTextMessage.setJMSDeliveryMode(JMS_DELIV_MODE);
+ testTextMessage.setJMSType(JMS_TYPE);
+ testTextMessage.setJMSReplyTo(JMS_REPLY_TO);
+ testTextMessage.setText("testTextMessage text");
+
+ //Add non-JMS properties
+ testTextMessage.setStringProperty("testProp1","testValue1");
+ testTextMessage.setDoubleProperty("testProp2",Double.MIN_VALUE);
+
+ testMapMessage = new JMSMapMessage();
+ testMapMessage.setString("testMapString","testMapStringValue");
+ testMapMessage.setDouble("testMapDouble",Double.MAX_VALUE);
+ }
+
+ public void testSetProperties() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
+
+ //check JMS prop values on newMessage match
+ assertEquals("JMS Correlation ID mismatch",testTextMessage.getJMSCorrelationID(),newMessage.getJMSCorrelationID());
+ assertEquals("JMS Delivery mode mismatch",testTextMessage.getJMSDeliveryMode(),newMessage.getJMSDeliveryMode());
+ assertEquals("JMS Type mismatch",testTextMessage.getJMSType(),newMessage.getJMSType());
+ assertEquals("JMS Reply To mismatch",testTextMessage.getJMSReplyTo(),newMessage.getJMSReplyTo());
+
+ //check non-JMS standard props ok too
+ assertEquals("Test String prop value mismatch",testTextMessage.getStringProperty("testProp1"),
+ newMessage.getStringProperty("testProp1"));
+ assertEquals("Test Double prop value mismatch",testTextMessage.getDoubleProperty("testProp2"),
+ newMessage.getDoubleProperty("testProp2"));
+ }
+
+ public void testJMSTextMessageConversion() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((TextMessage)testTextMessage).getConvertedMessage();
+ assertEquals("Converted message text mismatch",((JMSTextMessage)newMessage).getText(),testTextMessage.getText());
+ }
+
+ public void testJMSMapMessageConversion() throws Exception
+ {
+ AbstractJMSMessage newMessage = new MessageConverter((MapMessage)testMapMessage).getConvertedMessage();
+ assertEquals("Converted map message String mismatch",((JMSMapMessage)newMessage).getString("testMapString"),
+ testMapMessage.getString("testMapString"));
+ assertEquals("Converted map message Double mismatch",((JMSMapMessage)newMessage).getDouble("testMapDouble"),
+ testMapMessage.getDouble("testMapDouble"));
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ testTextMessage = null;
+ }
+
+
+}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
index 184d7cb015..7d83d19d74 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
@@ -1,11 +1,23 @@
-/**
- * User: Robert Greig
- * Date: 12-Dec-2006
- ******************************************************************************
- * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
- * this program may be photocopied reproduced or translated to another
- * program language without prior written consent of JP Morgan Chase Ltd
- ******************************************************************************/
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.unit.message;
import junit.framework.TestCase;
@@ -47,7 +59,7 @@ public class StreamMessageTest extends TestCase
public void testStreamMessageEOF() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -60,7 +72,7 @@ public class StreamMessageTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -101,7 +113,7 @@ public class StreamMessageTest extends TestCase
public void testModifyReceivedMessageExpandsBuffer() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue = new AMQQueue("testQ");
MessageConsumer consumer = consumerSession.createConsumer(queue);
@@ -123,7 +135,7 @@ public class StreamMessageTest extends TestCase
}
}
});
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer mandatoryProducer = producerSession.createProducer(queue);
con.start();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index 5ded8aaeef..7e645f1a26 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -25,7 +25,7 @@ import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -39,10 +39,23 @@ import junit.framework.TestCase;
public class DurableSubscriptionTest extends TestCase
{
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
public void testUnsubscribe() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -83,7 +96,7 @@ public class DurableSubscriptionTest extends TestCase
public void testDurability() throws AMQException, JMSException, URLSyntaxException
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(topic);
MessageProducer producer = session1.createProducer(topic);
@@ -128,6 +141,6 @@ public class DurableSubscriptionTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(DurableSubscriptionTest.class));
+ return new junit.framework.TestSuite(DurableSubscriptionTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
index 4ffb3e8459..c4acf15a58 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicPublisherTest.java
@@ -48,7 +48,7 @@ public class TopicPublisherTest extends TestCase
public void testUnidentifiedProducer() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(null);
MessageConsumer consumer1 = session1.createConsumer(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 794316d2f5..8e883a2184 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -51,7 +51,7 @@ public class TopicSessionTest extends TestCase
public void testTopicSubscriptionUnsubscription() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic,"subscription0");
TopicPublisher publisher = session1.createPublisher(topic);
@@ -97,7 +97,7 @@ public class TopicSessionTest extends TestCase
{
AMQTopic topic = new AMQTopic("MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic("MyOtherTopic1" + String.valueOf(shutdown));
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -112,7 +112,7 @@ public class TopicSessionTest extends TestCase
{
session1.close();
con.close();
- con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
con.start();
session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
@@ -134,11 +134,11 @@ public class TopicSessionTest extends TestCase
public void testUnsubscriptionAfterConnectionClose() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic3");
- AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con1 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
- AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ AMQConnection con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
@@ -149,7 +149,7 @@ public class TopicSessionTest extends TestCase
assertNotNull(tm);
con2.close();
publisher.publish(session1.createTextMessage("Hello2"));
- con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "/test");
+ con2 = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test2", "test");
session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
@@ -163,14 +163,14 @@ public class TopicSessionTest extends TestCase
public void testTextMessageCreation() throws Exception
{
AMQTopic topic = new AMQTopic("MyTopic4");
- AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection con = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
con.start();
TextMessage tm = session1.createTextMessage("Hello");
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(200000L);
assertNotNull(tm);
String msgText = tm.getText();
assertEquals("Hello", msgText);
@@ -178,7 +178,7 @@ public class TopicSessionTest extends TestCase
msgText = tm.getText();
assertNull(msgText);
publisher.publish(tm);
- tm = (TextMessage) consumer1.receive(2000);
+ tm = (TextMessage) consumer1.receive(20000000L);
assertNotNull(tm);
msgText = tm.getText();
assertNull(msgText);
@@ -202,7 +202,7 @@ public class TopicSessionTest extends TestCase
public void testSendingSameMessage() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
@@ -219,12 +219,13 @@ public class TopicSessionTest extends TestCase
assertNotNull(receivedMessage);
assertEquals(sentMessage.getText(),receivedMessage.getText());
+ conn.close();
}
public void testTemporaryTopic() throws Exception
{
- AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+ AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "test");
TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic topic = session.createTemporaryTopic();
assertNotNull(topic);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 3e1fc04626..ce6df83baf 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -23,8 +23,11 @@ package org.apache.qpid.test.unit.transacted;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
import org.apache.log4j.Logger;
import javax.jms.*;
@@ -54,10 +57,11 @@ public class TransactedTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- queue1 = new AMQQueue("Q1", "Q1", false, true);
+ TransportConnection.createVMBroker(1);
+ queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
queue2 = new AMQQueue("Q2", false);
- con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "/test");
+ con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
session = con.createSession(true, 0);
consumer1 = session.createConsumer(queue1);
//Dummy just to create the queue.
@@ -66,16 +70,26 @@ public class TransactedTest extends TestCase
producer2 = session.createProducer(queue2);
con.start();
- prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "/test");
+ prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
prepProducer1 = prepSession.createProducer(queue1);
prepCon.start();
+
+// //add some messages
+// prepProducer1.send(prepSession.createTextMessage("A"));
+// prepProducer1.send(prepSession.createTextMessage("B"));
+// prepProducer1.send(prepSession.createTextMessage("C"));
+//
+// testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+// testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+// testConsumer2 = testSession.createConsumer(queue2);
}
protected void tearDown() throws Exception
{
con.close();
prepCon.close();
+ TransportConnection.killAllVMBrokers();
super.tearDown();
}
@@ -96,9 +110,9 @@ public class TransactedTest extends TestCase
//commit
session.commit();
-
+ testCon.start();
//ensure sent messages can be received and received messages are gone
- testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -108,6 +122,7 @@ public class TransactedTest extends TestCase
expect("Y", testConsumer2.receive(1000));
expect("Z", testConsumer2.receive(1000));
+ testConsumer1 = testSession.createConsumer(queue1);
assertTrue(null == testConsumer1.receive(1000));
assertTrue(null == testConsumer2.receive(1000));
testCon.close();
@@ -141,11 +156,12 @@ public class TransactedTest extends TestCase
expect("A", consumer1.receive(1000));
expect("B", consumer1.receive(1000));
expect("C", consumer1.receive(1000));
-
+ testCon.start();
+ testConsumer1 = testSession.createConsumer(queue1);
//commit
session.commit();
- testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "/test");
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
testConsumer1 = testSession.createConsumer(queue1);
testConsumer2 = testSession.createConsumer(queue2);
@@ -164,7 +180,7 @@ public class TransactedTest extends TestCase
public void testResendsMsgsAfterSessionClose() throws Exception
{
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
AMQQueue queue3 = new AMQQueue("Q3", false);
@@ -172,7 +188,7 @@ public class TransactedTest extends TestCase
//force synch to ensure the consumer has resulted in a bound queue
((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue3);
@@ -234,6 +250,7 @@ public class TransactedTest extends TestCase
con.close();
con2.close();
+
}
// This checks that queue Q1 is in fact empty and does not have any stray
@@ -251,6 +268,6 @@ public class TransactedTest extends TestCase
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(TransactedTest.class));
+ return new junit.framework.TestSuite(TransactedTest.class);
}
}