summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-20 21:04:49 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-20 21:04:49 +0000
commit8913650e0f7ad901056d7eddff8a108e283a20d1 (patch)
tree8d8a8fc10d835431de9925465dcef330c804e130 /java/client
parentbae5b4dac83c2cc28badf10f2fde659066ec27fe (diff)
downloadqpid-python-8913650e0f7ad901056d7eddff8a108e283a20d1.tar.gz
Added simple transaction test with commit and rollback for message reference transfers. There is plenty of scope for more sophisticated tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java193
2 files changed, 199 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index f4d588ca9b..d24892d9b6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -837,6 +837,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
// Test purposes only - used for testing refs, which cannot be done using JMS interfaces
+ public BasicMessageProducer createBasicProducer(Destination destination) throws JMSException
+ {
+ return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
+ }
+
+ // Test purposes only - used for testing refs, which cannot be done using JMS interfaces
public BasicMessageProducer createBasicProducer(Topic destination) throws JMSException
{
return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java
new file mode 100644
index 0000000000..1e9da6eeae
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.mina.util.SessionLog;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
+import junit.framework.TestCase;
+
+public class TransactedRefTest extends TestCase
+{
+ private ConnectionTuneParameters tp;
+ private String message;
+ private AMQQueue queue1;
+ private AMQQueue queue2;
+
+ private AMQConnection con;
+ private AMQSession session;
+ private MessageConsumer consumer1;
+ private BasicMessageProducer producer2;
+
+ private AMQConnection prepCon;
+ private AMQSession prepSession;
+ private BasicMessageProducer prepProducer1;
+
+ private AMQConnection testCon;
+ private Session testSession;
+ private MessageConsumer testConsumer1;
+ private MessageConsumer testConsumer2;
+ private static final Logger _logger = Logger.getLogger(TransactedTest.class);
+
+ protected void setUp() throws Exception
+ {
+ tp = new ConnectionTuneParameters();
+ tp.setFrameMax(1000L);
+ tp.setChannelMax(32767);
+ tp.setHeartbeat(600);
+ message = createMessage(2500);
+
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ queue2 = new AMQQueue("Q2", false);
+
+ con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test", tp);
+ session = con.createAMQSession(true, 0);
+ consumer1 = session.createConsumer(queue1);
+ //Dummy just to create the queue.
+ MessageConsumer consumer2 = session.createConsumer(queue2);
+ consumer2.close();
+ producer2 = session.createBasicProducer(queue2);
+ con.start();
+
+ prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test", tp);
+ prepSession = prepCon.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE);
+ prepProducer1 = prepSession.createBasicProducer(queue1);
+ prepCon.start();
+
+ //add some messages
+ prepProducer1.sendAsRef(prepSession.createTextMessage("A" + message));
+ prepProducer1.sendAsRef(prepSession.createTextMessage("B" + message));
+ prepProducer1.sendAsRef(prepSession.createTextMessage("C" + message));
+
+ producer2.send(session.createTextMessage("X" + message));
+ producer2.send(session.createTextMessage("Y" + message));
+ producer2.send(session.createTextMessage("Z" + message));
+ }
+
+ protected void tearDown() throws Exception
+ {
+ testCon.close();
+ con.close();
+ prepCon.close();
+ TransportConnection.killAllVMBrokers();
+ super.tearDown();
+ }
+
+ public void testCommit() throws Exception
+ {
+ expect("A" + message, consumer1.receive(1000));
+ expect("B" + message, consumer1.receive(1000));
+ expect("C" + message, consumer1.receive(1000));
+
+ //commit
+ session.commit();
+
+ //ensure sent messages can be received and received messages are gone
+
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test", tp);
+ testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ testConsumer1 = testSession.createConsumer(queue1);
+ testConsumer2 = testSession.createConsumer(queue2);
+ testCon.start();
+
+ expect("X" + message, testConsumer2.receive(1000));
+ expect("Y" + message, testConsumer2.receive(1000));
+ expect("Z" + message, testConsumer2.receive(1000));
+
+ testConsumer1 = testSession.createConsumer(queue1);
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+ }
+
+ // This checks that queue Q1 is in fact empty and does not have any stray
+ // messages left over from the last test (which can affect later tests)...
+ public void testEmpty1() throws Exception
+ {
+ assertTrue(null == consumer1.receive(1000));
+ }
+
+ public void testRollback() throws Exception
+ {
+ expect("A" + message, consumer1.receive(1000));
+ expect("B" + message, consumer1.receive(1000));
+ expect("C" + message, consumer1.receive(1000));
+
+ //rollback
+ session.rollback();
+
+ //ensure sent messages are not visible and received messages are requeued
+ expect("A" + message, consumer1.receive(1000));
+ expect("B" + message, consumer1.receive(1000));
+ expect("C" + message, consumer1.receive(1000));
+
+ //commit
+ //session.commit();
+
+
+ testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test", tp);
+ testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ testConsumer1 = testSession.createConsumer(queue1);
+ testConsumer2 = testSession.createConsumer(queue2);
+ testCon.start();
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+ }
+
+ // This checks that queue Q1 is in fact empty and does not have any stray
+ // messages left over from the last test (which can affect later tests)...
+ public void testEmpty2() throws Exception
+ {
+ assertTrue(null == consumer1.receive(1000));
+ }
+
+ private void expect(String text, Message msg) throws JMSException
+ {
+ assertTrue(msg instanceof TextMessage);
+ assertEquals(text, ((TextMessage) msg).getText());
+ }
+
+ // Utility to create message "012345678901234567890..." for length len chars.
+ private String createMessage(int len)
+ {
+ StringBuffer sb = new StringBuffer(len);
+ for (int i=0; i<len; i++)
+ sb.append(i%10);
+ return sb.toString();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TransactedTest.class);
+ }
+}