diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 21:04:49 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-20 21:04:49 +0000 |
| commit | 8913650e0f7ad901056d7eddff8a108e283a20d1 (patch) | |
| tree | 8d8a8fc10d835431de9925465dcef330c804e130 /java/client | |
| parent | bae5b4dac83c2cc28badf10f2fde659066ec27fe (diff) | |
| download | qpid-python-8913650e0f7ad901056d7eddff8a108e283a20d1.tar.gz | |
Added simple transaction test with commit and rollback for message reference transfers. There is plenty of scope for more sophisticated tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 6 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java | 193 |
2 files changed, 199 insertions, 0 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f4d588ca9b..d24892d9b6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -837,6 +837,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } // Test purposes only - used for testing refs, which cannot be done using JMS interfaces + public BasicMessageProducer createBasicProducer(Destination destination) throws JMSException + { + return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); + } + + // Test purposes only - used for testing refs, which cannot be done using JMS interfaces public BasicMessageProducer createBasicProducer(Topic destination) throws JMSException { return (BasicMessageProducer)createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java new file mode 100644 index 0000000000..1e9da6eeae --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedRefTest.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.transacted; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.ConnectionTuneParameters; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.VMBrokerSetup; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.mina.util.SessionLog; +import org.apache.log4j.Logger; + +import javax.jms.*; + +import junit.framework.TestCase; + +public class TransactedRefTest extends TestCase +{ + private ConnectionTuneParameters tp; + private String message; + private AMQQueue queue1; + private AMQQueue queue2; + + private AMQConnection con; + private AMQSession session; + private MessageConsumer consumer1; + private BasicMessageProducer producer2; + + private AMQConnection prepCon; + private AMQSession prepSession; + private BasicMessageProducer prepProducer1; + + private AMQConnection testCon; + private Session testSession; + private MessageConsumer testConsumer1; + private MessageConsumer testConsumer2; + private static final Logger _logger = Logger.getLogger(TransactedTest.class); + + protected void setUp() throws Exception + { + tp = new ConnectionTuneParameters(); + tp.setFrameMax(1000L); + tp.setChannelMax(32767); + tp.setHeartbeat(600); + message = createMessage(2500); + + super.setUp(); + TransportConnection.createVMBroker(1); + queue1 = new AMQQueue(new AMQShortString("Q1"), new AMQShortString("Q1"), false, true); + queue2 = new AMQQueue("Q2", false); + + con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test", tp); + session = con.createAMQSession(true, 0); + consumer1 = session.createConsumer(queue1); + //Dummy just to create the queue. + MessageConsumer consumer2 = session.createConsumer(queue2); + consumer2.close(); + producer2 = session.createBasicProducer(queue2); + con.start(); + + prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test", tp); + prepSession = prepCon.createAMQSession(false, AMQSession.NO_ACKNOWLEDGE); + prepProducer1 = prepSession.createBasicProducer(queue1); + prepCon.start(); + + //add some messages + prepProducer1.sendAsRef(prepSession.createTextMessage("A" + message)); + prepProducer1.sendAsRef(prepSession.createTextMessage("B" + message)); + prepProducer1.sendAsRef(prepSession.createTextMessage("C" + message)); + + producer2.send(session.createTextMessage("X" + message)); + producer2.send(session.createTextMessage("Y" + message)); + producer2.send(session.createTextMessage("Z" + message)); + } + + protected void tearDown() throws Exception + { + testCon.close(); + con.close(); + prepCon.close(); + TransportConnection.killAllVMBrokers(); + super.tearDown(); + } + + public void testCommit() throws Exception + { + expect("A" + message, consumer1.receive(1000)); + expect("B" + message, consumer1.receive(1000)); + expect("C" + message, consumer1.receive(1000)); + + //commit + session.commit(); + + //ensure sent messages can be received and received messages are gone + + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test", tp); + testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + testConsumer1 = testSession.createConsumer(queue1); + testConsumer2 = testSession.createConsumer(queue2); + testCon.start(); + + expect("X" + message, testConsumer2.receive(1000)); + expect("Y" + message, testConsumer2.receive(1000)); + expect("Z" + message, testConsumer2.receive(1000)); + + testConsumer1 = testSession.createConsumer(queue1); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + } + + // This checks that queue Q1 is in fact empty and does not have any stray + // messages left over from the last test (which can affect later tests)... + public void testEmpty1() throws Exception + { + assertTrue(null == consumer1.receive(1000)); + } + + public void testRollback() throws Exception + { + expect("A" + message, consumer1.receive(1000)); + expect("B" + message, consumer1.receive(1000)); + expect("C" + message, consumer1.receive(1000)); + + //rollback + session.rollback(); + + //ensure sent messages are not visible and received messages are requeued + expect("A" + message, consumer1.receive(1000)); + expect("B" + message, consumer1.receive(1000)); + expect("C" + message, consumer1.receive(1000)); + + //commit + //session.commit(); + + + testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test", tp); + testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + testConsumer1 = testSession.createConsumer(queue1); + testConsumer2 = testSession.createConsumer(queue2); + testCon.start(); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + } + + // This checks that queue Q1 is in fact empty and does not have any stray + // messages left over from the last test (which can affect later tests)... + public void testEmpty2() throws Exception + { + assertTrue(null == consumer1.receive(1000)); + } + + private void expect(String text, Message msg) throws JMSException + { + assertTrue(msg instanceof TextMessage); + assertEquals(text, ((TextMessage) msg).getText()); + } + + // Utility to create message "012345678901234567890..." for length len chars. + private String createMessage(int len) + { + StringBuffer sb = new StringBuffer(len); + for (int i=0; i<len; i++) + sb.append(i%10); + return sb.toString(); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TransactedTest.class); + } +} |
