From 1db5a8a2329ec064d1683294ee1a3d8d233de42d Mon Sep 17 00:00:00 2001 From: Stephen Vinoski Date: Sat, 18 Nov 2006 02:12:32 +0000 Subject: directory moves required for maven merge git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476414 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/org/apache/qpid/server/UnitTests.java | 42 --- .../src/org/apache/qpid/server/ack/TxAckTest.java | 189 ------------ .../src/org/apache/qpid/server/ack/UnitTests.java | 37 --- .../exchange/AbstractHeadersExchangeTest.java | 215 -------------- .../exchange/HeadersExchangePerformanceTest.java | 184 ------------ .../qpid/server/exchange/HeadersExchangeTest.java | 84 ------ .../org/apache/qpid/server/exchange/UnitTests.java | 35 --- .../apache/qpid/server/protocol/MockIoSession.java | 291 ------------------ .../server/protocol/TestProtocolInitiation.java | 215 -------------- .../org/apache/qpid/server/protocol/UnitTests.java | 35 --- .../src/org/apache/qpid/server/queue/AckTest.java | 324 --------------------- .../apache/qpid/server/queue/ConcurrencyTest.java | 264 ----------------- .../queue/ConcurrentDeliveryManagerTest.java | 51 ---- .../qpid/server/queue/DeliveryManagerTest.java | 162 ----------- .../qpid/server/queue/MessageTestHelper.java | 52 ---- .../qpid/server/queue/MockProtocolSession.java | 124 -------- .../qpid/server/queue/QueueConcurrentPerfTest.java | 49 ---- .../apache/qpid/server/queue/QueuePerfTest.java | 258 ---------------- .../org/apache/qpid/server/queue/SendPerfTest.java | 174 ----------- .../qpid/server/queue/SubscriptionManagerTest.java | 108 ------- .../qpid/server/queue/SubscriptionSetTest.java | 152 ---------- .../queue/SynchronizedDeliveryManagerTest.java | 52 ---- .../apache/qpid/server/queue/TestSubscription.java | 87 ------ .../org/apache/qpid/server/queue/UnitTests.java | 41 --- .../qpid/server/store/SkeletonMessageStore.java | 102 ------- .../qpid/server/store/TestReferenceCounting.java | 71 ----- .../server/store/TestableMemoryMessageStore.java | 42 --- .../org/apache/qpid/server/store/UnitTests.java | 37 --- .../org/apache/qpid/server/txn/TxnBufferTest.java | 308 -------------------- .../src/org/apache/qpid/server/txn/UnitTests.java | 37 --- .../org/apache/qpid/server/util/AveragedRun.java | 66 ----- .../apache/qpid/server/util/ConcurrentTest.java | 79 ----- .../src/org/apache/qpid/server/util/RunStats.java | 57 ---- .../qpid/server/util/TestApplicationRegistry.java | 107 ------- .../src/org/apache/qpid/server/util/TimedRun.java | 52 ---- .../test/unit/ack/DisconnectAndRedeliverTest.java | 237 --------------- .../org/apache/qpid/test/unit/ack/UnitTests.java | 35 --- .../java/org/apache/qpid/server/UnitTests.java | 42 +++ .../java/org/apache/qpid/server/ack/TxAckTest.java | 189 ++++++++++++ .../java/org/apache/qpid/server/ack/UnitTests.java | 37 +++ .../exchange/AbstractHeadersExchangeTest.java | 215 ++++++++++++++ .../exchange/HeadersExchangePerformanceTest.java | 184 ++++++++++++ .../qpid/server/exchange/HeadersExchangeTest.java | 84 ++++++ .../org/apache/qpid/server/exchange/UnitTests.java | 35 +++ .../apache/qpid/server/protocol/MockIoSession.java | 291 ++++++++++++++++++ .../server/protocol/TestProtocolInitiation.java | 215 ++++++++++++++ .../org/apache/qpid/server/protocol/UnitTests.java | 35 +++ .../java/org/apache/qpid/server/queue/AckTest.java | 324 +++++++++++++++++++++ .../apache/qpid/server/queue/ConcurrencyTest.java | 264 +++++++++++++++++ .../queue/ConcurrentDeliveryManagerTest.java | 51 ++++ .../qpid/server/queue/DeliveryManagerTest.java | 162 +++++++++++ .../qpid/server/queue/MessageTestHelper.java | 52 ++++ .../qpid/server/queue/MockProtocolSession.java | 124 ++++++++ .../qpid/server/queue/QueueConcurrentPerfTest.java | 49 ++++ .../apache/qpid/server/queue/QueuePerfTest.java | 258 ++++++++++++++++ .../org/apache/qpid/server/queue/SendPerfTest.java | 174 +++++++++++ .../qpid/server/queue/SubscriptionManagerTest.java | 108 +++++++ .../qpid/server/queue/SubscriptionSetTest.java | 152 ++++++++++ .../queue/SynchronizedDeliveryManagerTest.java | 52 ++++ .../apache/qpid/server/queue/TestSubscription.java | 87 ++++++ .../org/apache/qpid/server/queue/UnitTests.java | 41 +++ .../qpid/server/store/SkeletonMessageStore.java | 102 +++++++ .../qpid/server/store/TestReferenceCounting.java | 71 +++++ .../server/store/TestableMemoryMessageStore.java | 42 +++ .../org/apache/qpid/server/store/UnitTests.java | 37 +++ .../org/apache/qpid/server/txn/TxnBufferTest.java | 308 ++++++++++++++++++++ .../java/org/apache/qpid/server/txn/UnitTests.java | 37 +++ .../org/apache/qpid/server/util/AveragedRun.java | 66 +++++ .../apache/qpid/server/util/ConcurrentTest.java | 79 +++++ .../java/org/apache/qpid/server/util/RunStats.java | 57 ++++ .../qpid/server/util/TestApplicationRegistry.java | 107 +++++++ .../java/org/apache/qpid/server/util/TimedRun.java | 52 ++++ .../test/unit/ack/DisconnectAndRedeliverTest.java | 237 +++++++++++++++ .../org/apache/qpid/test/unit/ack/UnitTests.java | 35 +++ 74 files changed, 4455 insertions(+), 4455 deletions(-) delete mode 100644 java/systests/src/org/apache/qpid/server/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/ack/TxAckTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/ack/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/exchange/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java delete mode 100644 java/systests/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java delete mode 100644 java/systests/src/org/apache/qpid/server/protocol/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/AckTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/ConcurrencyTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/DeliveryManagerTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/MessageTestHelper.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/MockProtocolSession.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/QueuePerfTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/SendPerfTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/SubscriptionSetTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/TestSubscription.java delete mode 100644 java/systests/src/org/apache/qpid/server/queue/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/store/SkeletonMessageStore.java delete mode 100644 java/systests/src/org/apache/qpid/server/store/TestReferenceCounting.java delete mode 100644 java/systests/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java delete mode 100644 java/systests/src/org/apache/qpid/server/store/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/txn/TxnBufferTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/txn/UnitTests.java delete mode 100644 java/systests/src/org/apache/qpid/server/util/AveragedRun.java delete mode 100644 java/systests/src/org/apache/qpid/server/util/ConcurrentTest.java delete mode 100644 java/systests/src/org/apache/qpid/server/util/RunStats.java delete mode 100644 java/systests/src/org/apache/qpid/server/util/TestApplicationRegistry.java delete mode 100644 java/systests/src/org/apache/qpid/server/util/TimedRun.java delete mode 100644 java/systests/src/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java delete mode 100644 java/systests/src/org/apache/qpid/test/unit/ack/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/ack/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/exchange/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/protocol/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/queue/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/store/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/txn/UnitTests.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java create mode 100644 java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java create mode 100644 java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java create mode 100644 java/systests/src/test/java/org/apache/qpid/test/unit/ack/UnitTests.java (limited to 'java/systests') diff --git a/java/systests/src/org/apache/qpid/server/UnitTests.java b/java/systests/src/org/apache/qpid/server/UnitTests.java deleted file mode 100644 index 1f4af59ce7..0000000000 --- a/java/systests/src/org/apache/qpid/server/UnitTests.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - org.apache.qpid.server.ack.UnitTests.class, - org.apache.qpid.server.exchange.UnitTests.class, - org.apache.qpid.server.protocol.UnitTests.class, - org.apache.qpid.server.queue.UnitTests.class, - org.apache.qpid.server.store.UnitTests.class, - org.apache.qpid.server.txn.UnitTests.class - }) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/org/apache/qpid/server/ack/TxAckTest.java deleted file mode 100644 index fe2c189c39..0000000000 --- a/java/systests/src/org/apache/qpid/server/ack/TxAckTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - -import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.Ignore; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.store.TestableMemoryMessageStore; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; - -public class TxAckTest -{ - private Scenario individual; - private Scenario multiple; - private Scenario combined; - - @Before - public void setup() throws Exception - { - //ack only 5th msg - individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); - individual.update(5, false); - - //ack all up to and including 5th msg - multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); - multiple.update(5, true); - - //leave only 8th and 9th unacked - combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); - combined.update(3, false); - combined.update(5, true); - combined.update(7, true); - combined.update(2, true);//should be ignored - combined.update(1, false);//should be ignored - combined.update(10, false); - } - - @Test - public void prepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - @Test - public void undoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - - @Test - public void commit() throws AMQException - { - individual.commit(); - multiple.commit(); - combined.commit(); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(TxAckTest.class); - } - - private class Scenario - { - private final LinkedHashMap _messages = new LinkedHashMap(); - private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages); - private final TxAck _op = new TxAck(_map); - private final List _acked; - private final List _unacked; - - Scenario(int messageCount, List acked, List unacked) - { - for(int i = 0; i < messageCount; i++) - { - long deliveryTag = i + 1; - _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag)); - } - _acked = acked; - _unacked = unacked; - } - - void update(long deliverytag, boolean multiple) - { - _op.update(deliverytag, multiple); - } - - private void assertCount(List tags, int expected) - { - for(long tag : tags) - { - UnacknowledgedMessage u = _messages.get(tag); - assertTrue("Message not found for tag " + tag, u != null); - ((TestMessage) u.message).assertCountEquals(expected); - } - } - - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - - void commit() - { - _op.consolidate(); - _op.commit(); - - - //check acked messages are removed from map - HashSet keys = new HashSet(_messages.keySet()); - keys.retainAll(_acked); - assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); - //check unacked messages are still in map - keys = new HashSet(_unacked); - keys.removeAll(_messages.keySet()); - assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); - } - } - - private class TestMessage extends AMQMessage - { - private final long _tag; - private int _count; - - TestMessage(long tag) - { - super(new TestableMemoryMessageStore(), null); - _tag = tag; - } - - public void incrementReference() - { - _count++; - } - - public void decrementReference() - { - _count--; - } - - void assertCountEquals(int expected) - { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); - } - } -} diff --git a/java/systests/src/org/apache/qpid/server/ack/UnitTests.java b/java/systests/src/org/apache/qpid/server/ack/UnitTests.java deleted file mode 100644 index 45ffe1fd6c..0000000000 --- a/java/systests/src/org/apache/qpid/server/ack/UnitTests.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - TxAckTest.class -}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/systests/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java deleted file mode 100644 index 1bb1d12e1c..0000000000 --- a/java/systests/src/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.AMQException; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import java.util.List; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Set; -import java.util.HashSet; - -public class AbstractHeadersExchangeTest -{ - private final HeadersExchange exchange = new HeadersExchange(); - protected final Set queues = new HashSet(); - private int count; - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - return bind("Queue" + (++count), bindings); - } - - protected TestQueue bind(String queueName, String... bindings) throws AMQException - { - return bind(queueName, getHeaders(bindings)); - } - - protected TestQueue bind(String queue, FieldTable bindings) throws AMQException - { - return bind(new TestQueue(queue), bindings); - } - - protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException - { - return bind(queue, getHeaders(bindings)); - } - - protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException - { - queues.add(queue); - exchange.registerQueue(null, queue, bindings); - return queue; - } - - - protected void route(Message m) throws AMQException - { - m.route(exchange); - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List expected) throws AMQException - { - route(m); - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = new FieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.put(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - static BasicPublishBody getPublishRequest(String id) - { - BasicPublishBody request = new BasicPublishBody(); - request.routingKey = id; - return request; - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends AMQQueue - { - final List messages = new ArrayList(); - - public TestQueue(String name) throws AMQException - { - super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); - } - - public void deliver(AMQMessage msg) throws AMQException - { - messages.add(new HeadersExchangeTest.Message(msg)); - } - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private static MessageStore _messageStore = new SkeletonMessageStore(); - - Message(String id, String... headers) throws AMQException - { - this(id, getHeaders(headers)); - } - - Message(String id, FieldTable headers) throws AMQException - { - this(getPublishRequest(id), getContentHeader(headers), null); - } - - private Message(BasicPublishBody publish, ContentHeaderBody header, List bodies) throws AMQException - { - super(_messageStore, publish, header, bodies); - } - - private Message(AMQMessage msg) throws AMQException - { - super(msg); - } - - void route(Exchange exchange) throws AMQException - { - exchange.route(this); - } - - boolean isInQueue(TestQueue queue) - { - return queue.messages.contains(this); - } - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - return getPublishBody().routingKey; - } - } -} diff --git a/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java deleted file mode 100644 index ff0d58ad69..0000000000 --- a/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; - -import java.util.List; - -/** - * Want to vary the number of regsitrations, messages and matches and measure - * the corresponding variance in execution time. - *

- * Each registration will contain the 'All' header, even registrations will - * contain the 'Even' header and odd headers will contain the 'Odd' header. - * In additions each regsitration will have a unique value for the 'Specific' - * header as well. - *

- * Messages can then be routed to all registrations, to even- or odd- registrations - * or to a specific registration. - * - */ -public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest -{ - private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} - - private final TestQueue[] queues; - private final Mode mode; - - public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException - { - this.mode = mode; - queues = new TestQueue[registrations]; - for (int i = 0; i < queues.length; i++) - { - switch(mode) - { - case ALL: - queues[i] = bind(new FastQueue("Queue" + i), "All"); - break; - case ODD_OR_EVEN: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); - break; - case SPECIFIC: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); - break; - } - } - } - - void sendToAll(int count) throws AMQException - { - send(count, "All=True"); - } - - void sendToOdd(int count) throws AMQException - { - send(count, "All=True", "Odd=True"); - } - - void sendToEven(int count) throws AMQException - { - send(count, "All=True", "Even=True"); - } - - void sendToAllSpecifically(int count) throws AMQException - { - for (int i = 0; i < queues.length; i++) - { - sendToSpecific(count, i); - } - } - - void sendToSpecific(int count, int index) throws AMQException - { - send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); - } - - private void send(int count, String... headers) throws AMQException - { - for (int i = 0; i < count; i++) - { - route(new Message("Message" + i, headers)); - } - } - - private static String oddOrEven(int i) - { - return (i % 2 == 0 ? "Even" : "Odd"); - } - - static class FastQueue extends TestQueue - { - - public FastQueue(String name) throws AMQException - { - super(name); - } - - public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List contentBodies) throws NoConsumersException - { - //just discard as we are not testing routing functionality here - } - } - - static class Test extends TimedRun - { - private final Mode mode; - private final int registrations; - private final int count; - private HeadersExchangePerformanceTest test; - - Test(Mode mode, int registrations, int count) - { - super(mode + ", registrations=" + registrations + ", count=" + count); - this.mode = mode; - this.registrations = registrations; - this.count = count; - } - - protected void setup() throws Exception - { - test = new HeadersExchangePerformanceTest(mode, registrations); - run(100); //do a warm up run before times start - } - - protected void teardown() throws Exception - { - test = null; - System.gc(); - } - - protected void run() throws Exception - { - run(count); - } - - private void run(int count) throws Exception - { - switch(mode) - { - case ALL: - test.sendToAll(count); - break; - default: - System.out.println("Test for " + mode + " not yet implemented."); - } - } - } - - public static void main(String[] argv) throws Exception - { - int registrations = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - TimedRun test = new Test(Mode.ALL, registrations, messages); - AveragedRun tests = new AveragedRun(test, iterations); - System.out.println(tests.call()); - } -} - diff --git a/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java deleted file mode 100644 index 7d80a6dce7..0000000000 --- a/java/systests/src/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.junit.Test; -import org.junit.Before; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; -import junit.framework.JUnit4TestAdapter; - -public class HeadersExchangeTest extends AbstractHeadersExchangeTest -{ - @Before - public void init() throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - } - - @Test - public void simple() throws AMQException - { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana"); - TestQueue q10 = bindDefault("F0000=Apple", "F0001"); - - routeAndTest(new Message("Message1", "F0000"), q1); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message("Message6", "F0002")); - } - - @Test - public void any() throws AMQException - { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message("Message1", "F0000"), q1, q3); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message6", "F0002")); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(HeadersExchangeTest.class); - } - -} diff --git a/java/systests/src/org/apache/qpid/server/exchange/UnitTests.java b/java/systests/src/org/apache/qpid/server/exchange/UnitTests.java deleted file mode 100644 index 9ad3b65008..0000000000 --- a/java/systests/src/org/apache/qpid/server/exchange/UnitTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({HeadersExchangeTest.class}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java deleted file mode 100644 index 28047832b8..0000000000 --- a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.mina.common.*; -import org.apache.mina.common.support.DefaultCloseFuture; -import org.apache.mina.common.support.DefaultWriteFuture; - -import java.net.SocketAddress; -import java.util.Set; - -public class MockIoSession implements IoSession -{ - private AMQProtocolSession _protocolSession; - - /** - * Stores the last response written - */ - private Object _lastWrittenObject; - - private boolean _closing; - - public MockIoSession() - { - } - - public Object getLastWrittenObject() - { - return _lastWrittenObject; - } - - public IoService getService() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoHandler getHandler() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoSessionConfig getConfig() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoFilterChain getFilterChain() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public WriteFuture write(Object message) - { - WriteFuture wf = new DefaultWriteFuture(null); - _lastWrittenObject = message; - return wf; - } - - public CloseFuture close() - { - _closing = true; - CloseFuture cf = new DefaultCloseFuture(null); - cf.setClosed(); - return cf; - } - - public Object getAttachment() - { - return _protocolSession; - } - - public Object setAttachment(Object attachment) - { - Object current = _protocolSession; - _protocolSession = (AMQProtocolSession) attachment; - return current; - } - - public Object getAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key, Object value) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object removeAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean containsAttribute(String key) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Set getAttributeKeys() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public TransportType getTransportType() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isConnected() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isClosing() - { - return _closing; - } - - public CloseFuture getCloseFuture() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getRemoteAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getLocalAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getServiceAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public int getWriteTimeout() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWriteTimeoutInMillis() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setWriteTimeout(int writeTimeout) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public TrafficMask getTrafficMask() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWrittenBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadMessages() - { - return 0L; - } - - public long getWrittenMessages() - { - return 0L; - } - - public long getWrittenWriteRequests() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteMessages() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIoTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastReadTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastWriteTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isIdle(IdleStatus status) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleCount(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } -} diff --git a/java/systests/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java deleted file mode 100644 index 847c3cc51b..0000000000 --- a/java/systests/src/org/apache/qpid/server/protocol/TestProtocolInitiation.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import junit.framework.Assert; -import junit.framework.JUnit4TestAdapter; -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.codec.AMQEncoder; -import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; -import org.junit.Before; -import org.junit.Test; - -/** - * This test suite tests the handling of protocol initiation frames and related issues. - */ -public class TestProtocolInitiation implements ProtocolVersionList -{ - private AMQPFastProtocolHandler _protocolHandler; - - private MockIoSession _mockIoSession; - - /** - * We need to use the object encoder mechanism so to allow us to retrieve the - * output (a bytebuffer) we define our own encoder output class. The encoder - * writes the encoded data to this class, from where we can retrieve it during - * the test run. - */ - private class TestProtocolEncoderOutput implements ProtocolEncoderOutput - { - public ByteBuffer result; - - public void write(ByteBuffer buf) - { - result = buf; - } - - public void mergeAll() - { - throw new UnsupportedOperationException(); - } - - public WriteFuture flush() - { - throw new UnsupportedOperationException(); - } - } - - private class TestProtocolDecoderOutput implements ProtocolDecoderOutput - { - public Object result; - - public void write(Object buf) - { - result = buf; - } - - public void flush() - { - throw new UnsupportedOperationException(); - } - } - - @Before - public void createCommonObjects() - { - _mockIoSession = new MockIoSession(); - _protocolHandler = new AMQPFastProtocolHandler(null, null); - } - - - /** - * Tests that the AMQDecoder handles invalid protocol classes - * @throws Exception - */ - @Test(expected = AMQProtocolClassException.class) - public void testDecoderValidateProtocolClass() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolClass = 2; - decodePI(pi); - } - - /** - * Tests that the AMQDecoder handles invalid protocol instance numbers - * @throws Exception - */ - @Test(expected = AMQProtocolInstanceException.class) - public void testDecoderValidatesProtocolInstance() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolInstance = 2; - decodePI(pi); - } - - /** - * Tests that the AMQDecoder handles invalid protocol major - * @throws Exception - */ - @Test(expected = AMQProtocolVersionException.class) - public void testDecoderValidatesProtocolMajor() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMajor = 2; - decodePI(pi); - } - - /** - * Tests that the AMQDecoder handles invalid protocol minor - * @throws Exception - */ - @Test(expected = AMQProtocolVersionException.class) - public void testDecoderValidatesProtocolMinor() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMinor = 99; - decodePI(pi); - } - - /** - * Tests that the AMQDecoder accepts a valid PI - * @throws Exception - */ - @Test(expected = AMQProtocolHeaderException.class) - public void testDecoderValidatesHeader() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.header = new char[] {'P', 'Q', 'M', 'A' }; - decodePI(pi); - } - - /** - * Test that a valid header is passed by the decoder. - * @throws Exception - */ - @Test - public void testDecoderAcceptsValidHeader() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - decodePI(pi); - } - - /** - * This test checks that an invalid protocol header results in the - * connection being closed. - */ - @Test - public void testInvalidProtocolHeaderClosesConnection() throws Exception - { - AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); - _protocolHandler.exceptionCaught(_mockIoSession, pe); - Assert.assertNotNull(_mockIoSession.getLastWrittenObject()); - Object piResponse = _mockIoSession.getLastWrittenObject(); - Assert.assertEquals(piResponse.getClass(), ProtocolInitiation.class); - ProtocolInitiation pi = (ProtocolInitiation) piResponse; - Assert.assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, - createValidProtocolInitiation()); - Assert.assertTrue("Session has not been closed", _mockIoSession.isClosing()); - } - - private ProtocolInitiation createValidProtocolInitiation() - { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); - } - - /** - * Helper that encodes a protocol initiation and attempts to decode it - * @param pi - * @throws Exception - */ - private void decodePI(ProtocolInitiation pi) throws Exception - { - // we need to do this test at the level of the decoder since we initially only expect PI frames - // so the protocol handler is not set up to know whether it should be expecting a PI frame or - // a different type of frame - AMQDecoder decoder = new AMQDecoder(true); - AMQEncoder encoder = new AMQEncoder(); - TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); - encoder.encode(_mockIoSession, pi, peo); - TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); - decoder.decode(_mockIoSession, peo.result, pdo); - ((ProtocolInitiation) pdo.result).checkVersion(this); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(TestProtocolInitiation.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/protocol/UnitTests.java b/java/systests/src/org/apache/qpid/server/protocol/UnitTests.java deleted file mode 100644 index 53038a4058..0000000000 --- a/java/systests/src/org/apache/qpid/server/protocol/UnitTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({TestProtocolInitiation.class}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/org/apache/qpid/server/queue/AckTest.java deleted file mode 100644 index fe04bd43e1..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/AckTest.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.JUnit4TestAdapter; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.ack.UnacknowledgedMessage; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; - -import java.util.Iterator; -import java.util.Map; - -/** - * Tests that acknowledgements are handled correctly. - */ -public class AckTest -{ - private static final Logger _log = Logger.getLogger(AckTest.class); - - private SubscriptionImpl _subscription; - - private MockProtocolSession _protocolSession; - - private TestableMemoryMessageStore _messageStore; - - private AMQChannel _channel; - - private SubscriptionSet _subscriptionManager; - - private AMQQueue _queue; - - public AckTest() throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - } - - @Before - public void setup() throws Exception - { - _messageStore = new TestableMemoryMessageStore(); - _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/); - _protocolSession = new MockProtocolSession(_messageStore); - _protocolSession.addChannel(_channel); - _subscriptionManager = new SubscriptionSet(); - _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager); - } - - private void publishMessages(int count) throws AMQException - { - publishMessages(count, false); - } - - private void publishMessages(int count, boolean persistent) throws AMQException - { - for (int i = 1; i <= count; i++) - { - BasicPublishBody publishBody = new BasicPublishBody(); - publishBody.routingKey = "rk"; - publishBody.exchange = "someExchange"; - AMQMessage msg = new AMQMessage(_messageStore, publishBody); - if (persistent) - { - BasicContentHeaderProperties b = new BasicContentHeaderProperties(); - //This is DeliveryMode.PERSISTENT - b.setDeliveryMode((byte) 2); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; - msg.setContentHeaderBody(cb); - } - else - { - msg.setContentHeaderBody(new ContentHeaderBody()); - } - _subscription.send(msg, _queue); - } - } - - /** - * Tests that the acknowledgements are correctly associated with a channel and - * order is preserved when acks are enabled - */ - @Test - public void ackChannelAssociationTest() throws AMQException - { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - final int msgCount = 10; - publishMessages(msgCount, true); - - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMap().size() == msgCount); - - Iterator> it = map.entrySet().iterator(); - for (int i = 1; i <= map.size(); i++) - { - Map.Entry entry = it.next(); - assertTrue(entry.getKey() == i); - UnacknowledgedMessage unackedMsg = entry.getValue(); - assertTrue(unackedMsg.queue == _queue); - } - - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMap().size() == msgCount); - } - - /** - * Tests that in no-ack mode no messages are retained - */ - @Test - public void testNoAckMode() throws AMQException - { - // false arg means no acks expected - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false); - final int msgCount = 10; - publishMessages(msgCount); - - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMap().size() == 0); - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - @Test - public void singleAckReceivedTest() throws AMQException - { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(5, false); - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount - 1); - - Iterator> it = map.entrySet().iterator(); - int i = 1; - while (i <= map.size()) - { - Map.Entry entry = it.next(); - assertTrue(entry.getKey() == i); - UnacknowledgedMessage unackedMsg = entry.getValue(); - assertTrue(unackedMsg.queue == _queue); - // 5 is the delivery tag of the message that *should* be removed - if (++i == 5) - { - ++i; - } - } - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - @Test - public void multiAckReceivedTest() throws AMQException - { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(5, true); - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - - Iterator> it = map.entrySet().iterator(); - int i = 1; - while (i <= map.size()) - { - Map.Entry entry = it.next(); - assertTrue(entry.getKey() == i + 5); - UnacknowledgedMessage unackedMsg = entry.getValue(); - assertTrue(unackedMsg.queue == _queue); - ++i; - } - } - - /** - * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - */ - @Test - public void multiAckAllReceivedTest() throws AMQException - { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(0, true); - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - - Iterator> it = map.entrySet().iterator(); - int i = 1; - while (i <= map.size()) - { - Map.Entry entry = it.next(); - assertTrue(entry.getKey() == i + 5); - UnacknowledgedMessage unackedMsg = entry.getValue(); - assertTrue(unackedMsg.queue == _queue); - ++i; - } - } - - - @Test - public void testPrefetchHighLow() throws AMQException - { - int lowMark = 5; - int highMark = 10; - - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - _channel.setPrefetchLowMarkCount(lowMark); - _channel.setPrefetchHighMarkCount(highMark); - - assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); - assertTrue(_channel.getPrefetchHighMarkCount() == highMark); - - publishMessages(highMark); - - // at this point we should have sent out only highMark messages - // which have not bee received so will be queued up in the channel - // which should be suspended - assertTrue(_subscription.isSuspended()); - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == highMark); - - //acknowledge messages so we are just above lowMark - _channel.acknowledgeMessage(lowMark - 1, true); - - //we should still be suspended - assertTrue(_subscription.isSuspended()); - assertTrue(map.size() == lowMark + 1); - - //acknowledge one more message - _channel.acknowledgeMessage(lowMark, true); - - //and suspension should be lifted - assertTrue(!_subscription.isSuspended()); - - //pubilsh more msgs so we are just below the limit - publishMessages(lowMark - 1); - - //we should not be suspended - assertTrue(!_subscription.isSuspended()); - - //acknowledge all messages - _channel.acknowledgeMessage(0, true); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - //map will be empty - assertTrue(map.size() == 0); - } - - @Test - public void testPrefetch() throws AMQException - { - _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); - _channel.setPrefetchCount(5); - - assertTrue(_channel.getPrefetchCount() == 5); - - final int msgCount = 5; - publishMessages(msgCount); - - // at this point we should have sent out only 5 messages with a further 5 queued - // up in the channel which should now be suspended - assertTrue(_subscription.isSuspended()); - Map map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - _channel.acknowledgeMessage(5, true); - assertTrue(!_subscription.isSuspended()); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - assertTrue(map.size() == 0); - } - - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(AckTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/org/apache/qpid/server/queue/ConcurrencyTest.java deleted file mode 100644 index 60d86a14a7..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/ConcurrencyTest.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import org.junit.Test; -import org.junit.Assert; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; - -import java.util.*; -import java.util.concurrent.Executor; - -/** - * Tests delivery in the face of concurrent incoming _messages, subscription alterations - * and attempts to asynchronously process queued _messages. - */ -public class ConcurrencyTest extends MessageTestHelper -{ - private final Random random = new Random(); - - private final int numMessages = 1000; - - private final List _subscribers = new ArrayList(); - private final Set _active = new HashSet(); - private final List _messages = new ArrayList(); - private int next = 0;//index to next message to send - private final List _received = Collections.synchronizedList(new ArrayList()); - private final Executor _executor = new OnCurrentThreadExecutor(); - private final List _threads = new ArrayList(); - - private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); - private final DeliveryManager _deliveryMgr; - - private boolean isComplete; - private boolean failed; - - public ConcurrencyTest() throws Exception - { - _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - - @Test - public void concurrent1() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(1, 4, 4, 4); - run(); - check(); - } - - @Test - public void concurrent2() throws InterruptedException, AMQException - { - initSubscriptions(10); - initMessages(numMessages); - initThreads(4, 2, 2, 2); - run(); - check(); - } - - void check() - { - assertFalse("Failed", failed); - - _deliveryMgr.processAsync(_executor); - - assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); - for(int i = 0; i < _messages.size(); i++) - { - assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); - } - } - - void initSubscriptions(int subscriptions) - { - for(int i = 0; i < subscriptions; i++) - { - _subscribers.add(new TestSubscription("Subscriber" + i, _received)); - } - } - - void initMessages(int messages) throws AMQException - { - for(int i = 0; i < messages; i++) - { - _messages.add(message()); - } - } - - void initThreads(int senders, int subscribers, int suspenders, int processors) - { - addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); - addThreads(subscribers, new Subscriber()); - addThreads(suspenders, new Suspender()); - addThreads(processors, new Processor()); - } - - void addThreads(int count, Runnable runner) - { - for(int i = 0; i < count; i++) - { - _threads.add(new Thread(runner, runner.toString())); - } - } - - void run() throws InterruptedException - { - for(Thread t : _threads) - { - t.start(); - } - - for(Thread t : _threads) - { - t.join(); - } - } - - private void toggle(Subscription s) - { - synchronized (_active) - { - if (_active.contains(s)) - { - _active.remove(s); - Subscription result = _subscriptionMgr.removeSubscriber(s); - Assert.assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, - result != null && result.equals(s)); - } - else - { - _active.add(s); - _subscriptionMgr.addSubscriber(s); - } - } - } - - private AMQMessage nextMessage() - { - synchronized (_messages) - { - if (next < _messages.size()) - { - return _messages.get(next++); - } - else - { - if (!_deliveryMgr.hasQueuedMessages()) { - isComplete = true; - } - return null; - } - } - } - - private boolean randomBoolean() - { - return random.nextBoolean(); - } - - private TestSubscription randomSubscriber() - { - return _subscribers.get(random.nextInt(_subscribers.size())); - } - - private class Sender extends Runner - { - void doRun() throws Throwable - { - AMQMessage msg = nextMessage(); - if (msg != null) - { - _deliveryMgr.deliver(toString(), msg); - } - } - } - - private class OrderedSender extends Sender - { - synchronized void doRun() throws Throwable - { - super.doRun(); - } - } - - private class Suspender extends Runner - { - void doRun() throws Throwable - { - randomSubscriber().setSuspended(randomBoolean()); - } - } - - private class Subscriber extends Runner - { - void doRun() throws Throwable - { - toggle(randomSubscriber()); - } - } - - private class Processor extends Runner - { - void doRun() throws Throwable - { - _deliveryMgr.processAsync(_executor); - } - } - - private abstract class Runner implements Runnable - { - public void run() - { - try - { - while (!stop()) - { - doRun(); - } - } - catch (Throwable t) - { - failed = true; - t.printStackTrace(); - } - } - - abstract void doRun() throws Throwable; - - boolean stop() - { - return isComplete || failed; - } - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(ConcurrencyTest.class); - } - -} diff --git a/java/systests/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java deleted file mode 100644 index 9d841d86f5..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.DeliveryManagerTest; -import org.apache.qpid.AMQException; -import junit.framework.JUnit4TestAdapter; - -public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest -{ - public ConcurrentDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","true"); - _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(ConcurrentDeliveryManagerTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/org/apache/qpid/server/queue/DeliveryManagerTest.java deleted file mode 100644 index a6739223b0..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/DeliveryManagerTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import org.junit.Test; -import org.junit.runners.Suite; -import org.junit.runner.RunWith; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.AMQException; -import junit.framework.JUnit4TestAdapter; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - ConcurrentDeliveryManagerTest.class, - SynchronizedDeliveryManagerTest.class}) - -abstract public class DeliveryManagerTest extends MessageTestHelper -{ - protected final SubscriptionSet _subscriptions = new SubscriptionSet(); - protected DeliveryManager _mgr; - - public DeliveryManagerTest() throws Exception - { - } - - - @Test - public void startInQueueingMode() throws AMQException - { - AMQMessage[] messages = new AMQMessage[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - for (int i = 0; i < batch; i++) - { - _mgr.deliver("Me", messages[i]); - } - - TestSubscription s1 = new TestSubscription("1"); - TestSubscription s2 = new TestSubscription("2"); - _subscriptions.addSubscriber(s1); - _subscriptions.addSubscriber(s2); - - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver("Me", messages[i]); - } - - assertTrue(s1.getMessages().isEmpty()); - assertTrue(s2.getMessages().isEmpty()); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - - assertEquals(messages.length / 2, s1.getMessages().size()); - assertEquals(messages.length / 2, s2.getMessages().size()); - - for (int i = 0; i < messages.length; i++) - { - if (i % 2 == 0) - { - assertTrue(s1.getMessages().get(i / 2) == messages[i]); - } - else - { - assertTrue(s2.getMessages().get(i / 2) == messages[i]); - } - } - } - - @Test - public void startInDirectMode() throws AMQException - { - AMQMessage[] messages = new AMQMessage[10]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(); - } - int batch = messages.length / 2; - - TestSubscription s1 = new TestSubscription("1"); - _subscriptions.addSubscriber(s1); - - for (int i = 0; i < batch; i++) - { - _mgr.deliver("Me", messages[i]); - } - - assertEquals(batch, s1.getMessages().size()); - for (int i = 0; i < batch; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i)); - } - s1.getMessages().clear(); - assertEquals(0, s1.getMessages().size()); - - s1.setSuspended(true); - for (int i = batch; i < messages.length; i++) - { - _mgr.deliver("Me", messages[i]); - } - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(0, s1.getMessages().size()); - s1.setSuspended(false); - - _mgr.processAsync(new OnCurrentThreadExecutor()); - assertEquals(messages.length - batch, s1.getMessages().size()); - - for (int i = batch; i < messages.length; i++) - { - assertTrue(messages[i] == s1.getMessages().get(i - batch)); - } - - } - - @Test(expected = NoConsumersException.class) - public void noConsumers() throws AMQException - { - AMQMessage msg = message(true); - _mgr.deliver("Me", msg); - msg.checkDeliveredToConsumer(); - } - - @Test(expected = NoConsumersException.class) - public void noActiveConsumers() throws AMQException - { - TestSubscription s = new TestSubscription("A"); - _subscriptions.addSubscriber(s); - s.setSuspended(true); - AMQMessage msg = message(true); - _mgr.deliver("Me", msg); - msg.checkDeliveredToConsumer(); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(DeliveryManagerTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/org/apache/qpid/server/queue/MessageTestHelper.java deleted file mode 100644 index be40b3c681..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/MessageTestHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.AMQException; - -class MessageTestHelper -{ - private final MessageStore _messageStore = new SkeletonMessageStore(); - - MessageTestHelper() throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - } - - AMQMessage message() throws AMQException - { - return message(false); - } - - AMQMessage message(boolean immediate) throws AMQException - { - BasicPublishBody publish = new BasicPublishBody(); - publish.immediate = immediate; - return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null); - } - -} diff --git a/java/systests/src/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/org/apache/qpid/server/queue/MockProtocolSession.java deleted file mode 100644 index bb8fd5bc19..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/MockProtocolSession.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; - -import javax.security.sasl.SaslServer; -import java.util.HashMap; -import java.util.Map; - -/** - * A protocol session that can be used for testing purposes. - */ -public class MockProtocolSession implements AMQProtocolSession -{ - private MessageStore _messageStore; - - private Map _channelMap = new HashMap(); - - public MockProtocolSession(MessageStore messageStore) - { - _messageStore = messageStore; - } - - public void dataBlockReceived(AMQDataBlock message) throws Exception - { - } - - public void writeFrame(AMQDataBlock frame) - { - } - - public String getContextKey() - { - return null; - } - - public void setContextKey(String contextKey) - { - } - - public AMQChannel getChannel(int channelId) - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Invalid channel id: " + channelId); - } - else - { - return channel; - } - } - - public void addChannel(AMQChannel channel) - { - if (channel == null) - { - throw new IllegalArgumentException("Channel must not be null"); - } - else - { - _channelMap.put(channel.getChannelId(), channel); - } - } - - public void closeChannel(int channelId) throws AMQException - { - } - - public void removeChannel(int channelId) - { - _channelMap.remove(channelId); - } - - public void initHeartbeats(int delay) - { - } - - public void closeSession() throws AMQException - { - } - - public Object getKey() - { - return null; - } - - public String getLocalFQDN() - { - return null; - } - - public SaslServer getSaslServer() - { - return null; - } - - public void setSaslServer(SaslServer saslServer) - { - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java deleted file mode 100644 index 11c0026455..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.ConcurrentTest; - -public class QueueConcurrentPerfTest extends QueuePerfTest -{ - QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) - { - super(factory, queueCount, messages); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - for(Factory f : factories) - { - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); - } - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/org/apache/qpid/server/queue/QueuePerfTest.java deleted file mode 100644 index 5b3857396d..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/QueuePerfTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.RunStats; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class QueuePerfTest extends TimedRun -{ - private final Factory _factory; - private final int _queueCount; - private final int _messages; - private final String _msg = ""; - private List> _queues; - - QueuePerfTest(Factory factory, int queueCount, int messages) - { - super(factory + ", " + queueCount + ", " + messages); - _factory = factory; - _queueCount = queueCount; - _messages = messages; - } - - protected void setup() throws Exception - { - //init - int count = Integer.getInteger("prepopulate", 0); -// System.err.println("Prepopulating with " + count + " items"); - _queues = new ArrayList>(_queueCount); - for (int i = 0; i < _queueCount; i++) - { - Queue q = _factory.create(); - for(int j = 0; j < count; ++j) - { - q.add("Item"+ j); - } - _queues.add(q); - } - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - //dispatch - for (int i = 0; i < _messages; i++) - { - for (Queue q : _queues) - { - q.offer(_msg); - q.poll(); - } - } - } - - static interface Factory - { - Queue create(); - } - - static Factory CONCURRENT = new Factory() - { - public Queue create() - { - return new ConcurrentLinkedQueue(); - } - - public String toString() - { - return "ConcurrentLinkedQueue"; - } - - }; - - static Factory SYNCHRONIZED = new Factory() - { - public Queue create() - { - return new SynchronizedQueue(new LinkedList()); - } - - - public String toString() - { - return "Synchronized LinkedList"; - } - }; - - static Factory PLAIN = new Factory() - { - public Queue create() - { - return new LinkedList(); - } - - public String toString() - { - return "Plain LinkedList"; - } - }; - - static class SynchronizedQueue implements Queue - { - private final Queue queue; - - SynchronizedQueue(Queue queue) - { - this.queue = queue; - } - - public synchronized E element() - { - return queue.element(); - } - - public synchronized boolean offer(E o) - { - return queue.offer(o); - } - - public synchronized E peek() - { - return queue.peek(); - } - - public synchronized E poll() - { - return queue.poll(); - } - - public synchronized E remove() - { - return queue.remove(); - } - - public synchronized int size() - { - return queue.size(); - } - - public synchronized boolean isEmpty() - { - return queue.isEmpty(); - } - - public synchronized boolean contains(Object o) - { - return queue.contains(o); - } - - public synchronized Iterator iterator() - { - return queue.iterator(); - } - - public synchronized Object[] toArray() - { - return queue.toArray(); - } - - public synchronized T[] toArray(T[] a) - { - return queue.toArray(a); - } - - public synchronized boolean add(E o) - { - return queue.add(o); - } - - public synchronized boolean remove(Object o) - { - return queue.remove(o); - } - - public synchronized boolean containsAll(Collection c) - { - return queue.containsAll(c); - } - - public synchronized boolean addAll(Collection c) - { - return queue.addAll(c); - } - - public synchronized boolean removeAll(Collection c) - { - return queue.removeAll(c); - } - - public synchronized boolean retainAll(Collection c) - { - return queue.retainAll(c); - } - - public synchronized void clear() - { - queue.clear(); - } - } - - static void run(String label, AveragedRun test) throws Exception - { - RunStats stats = test.call(); - System.out.println((label == null ? "" : label + ", ") + test - + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - - for(Factory f : factories) - { - run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); - } - } - -} diff --git a/java/systests/src/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/org/apache/qpid/server/queue/SendPerfTest.java deleted file mode 100644 index 6490b9f270..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/SendPerfTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.MockIoSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.server.util.TimedRun; - -import java.util.ArrayList; -import java.util.List; - -public class SendPerfTest extends TimedRun -{ - private int _messages = 1000; - private int _clients = 10; - private List _queues; - - public SendPerfTest(int clients, int messages) - { - super("SendPerfTest, msgs=" + messages + ", clients=" + clients); - _messages = messages; - _clients = clients; - } - - protected void setup() throws Exception - { - _queues = initQueues(_clients); - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - deliver(_messages, _queues); - } - - //have a dummy AMQProtocolSession that does nothing on the writeFrame() - //set up x number of queues - //create necessary bits and pieces to deliver a message - //deliver y messages to each queue - - public static void main(String[] argv) throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry()); - int clients = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); - test.run(); - } - - /** - * Delivers messages to a number of queues. - * @param count the number of messages to deliver - * @param queues the list of queues - * @throws NoConsumersException - */ - static void deliver(int count, List queues) throws AMQException - { - BasicPublishBody publish = new BasicPublishBody(); - publish.exchange = new NullExchange().getName(); - ContentHeaderBody header = new ContentHeaderBody(); - List body = new ArrayList(); - MessageStore messageStore = new SkeletonMessageStore(); - body.add(new ContentBody()); - for (int i = 0; i < count; i++) - { - for (AMQQueue q : queues) - { - q.deliver(new AMQMessage(messageStore, i, publish, header, body)); - } - } - } - - static List initQueues(int number) throws AMQException - { - Exchange exchange = new NullExchange(); - List queues = new ArrayList(number); - for (int i = 0; i < number; i++) - { - AMQQueue q = createQueue("Queue" + (i + 1)); - q.bind("routingKey", exchange); - try - { - q.registerProtocolSession(createSession(), 1, "1", false); - } - catch (Exception e) - { - throw new AMQException("Error creating protocol session: " + e, e); - } - queues.add(q); - } - return queues; - } - - static AMQQueue createQueue(String name) throws AMQException - { - return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), - new OnCurrentThreadExecutor()); - } - - static AMQProtocolSession createSession() throws Exception - { - IApplicationRegistry reg = ApplicationRegistry.getInstance(); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); - result.addChannel(new AMQChannel(1, null, null)); - return result; - } - - static class NullExchange extends AbstractExchange - { - public String getName() - { - return "NullExchange"; - } - - protected ExchangeMBean createMBean() - { - return null; - } - - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - } - - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException - { - } - - public void route(AMQMessage payload) throws AMQException - { - } - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java deleted file mode 100644 index fe0b686c1a..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/SubscriptionManagerTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.junit.Test; -import junit.framework.JUnit4TestAdapter; - -public class SubscriptionManagerTest -{ - private final SubscriptionSet mgr = new SubscriptionSet(); - - @Test - public void basicSubscriptionManagement() - { - assertTrue(mgr.isEmpty()); - assertFalse(mgr.hasActiveSubscribers()); - TestSubscription s1 = new TestSubscription("S1"); - mgr.addSubscriber(s1); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - - TestSubscription s2 = new TestSubscription("S2"); - mgr.addSubscriber(s2); - - s2.setSuspended(true); - assertFalse(mgr.isEmpty()); - assertTrue(mgr.hasActiveSubscribers()); - assertTrue(s2.isSuspended()); - assertFalse(s1.isSuspended()); - - s1.setSuspended(true); - assertFalse(mgr.hasActiveSubscribers()); - - mgr.removeSubscriber(new TestSubscription("S1")); - assertFalse(mgr.isEmpty()); - mgr.removeSubscriber(new TestSubscription("S2")); - assertTrue(mgr.isEmpty()); - } - - @Test - public void roundRobin() - { - TestSubscription a = new TestSubscription("A"); - TestSubscription b = new TestSubscription("B"); - TestSubscription c = new TestSubscription("C"); - TestSubscription d = new TestSubscription("D"); - mgr.addSubscriber(a); - mgr.addSubscriber(b); - mgr.addSubscriber(c); - mgr.addSubscriber(d); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - c.setSuspended(true); - - for (int i = 0; i < 3; i++) - { - assertEquals(a, mgr.nextSubscriber(null)); - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(d, mgr.nextSubscriber(null)); - } - - mgr.removeSubscriber(a); - d.setSuspended(true); - c.setSuspended(false); - Subscription e = new TestSubscription("D"); - mgr.addSubscriber(e); - - for (int i = 0; i < 3; i++) - { - assertEquals(b, mgr.nextSubscriber(null)); - assertEquals(c, mgr.nextSubscriber(null)); - assertEquals(e, mgr.nextSubscriber(null)); - } - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(SubscriptionManagerTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/org/apache/qpid/server/queue/SubscriptionSetTest.java deleted file mode 100644 index e6b0f7e885..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/SubscriptionSetTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import org.junit.Test; - -public class SubscriptionSetTest -{ - /** - * A SubscriptionSet that counts the number of items scanned. - */ - static class TestSubscriptionSet extends SubscriptionSet - { - private int scanned = 0; - - void resetScanned() - { - scanned = 0; - } - - protected void subscriberScanned() - { - ++scanned; - } - - int getScanned() - { - return scanned; - } - } - - final TestSubscription sub1 = new TestSubscription("1"); - final TestSubscription sub2 = new TestSubscription("2"); - final TestSubscription sub3 = new TestSubscription("3"); - - final TestSubscription suspendedSub1 = new TestSubscription("sus1", true); - final TestSubscription suspendedSub2 = new TestSubscription("sus2", true); - final TestSubscription suspendedSub3 = new TestSubscription("sus3", true); - - @Test - public void nextMessage() - { - SubscriptionSet ss = new SubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(0, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub1); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - assertEquals(sub1, ss.nextSubscriber(null)); - assertEquals(1, ss.getCurrentSubscriber()); - - ss.addSubscriber(sub2); - ss.addSubscriber(sub3); - - assertEquals(sub2, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - @Test - public void nextMessageWhenAllSuspended() - { - SubscriptionSet ss = createAllSuspendedSubscriptionSet(); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - } - - private TestSubscriptionSet createAllSuspendedSubscriptionSet() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(suspendedSub3); - return ss; - } - - @Test - public void nextMessageAfterRemove() - { - SubscriptionSet ss = new SubscriptionSet(); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(suspendedSub2); - ss.addSubscriber(sub3); - assertEquals(sub3, ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - - assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1)); - - assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here. - assertEquals(2, ss.getCurrentSubscriber()); - } - - @Test - public void nextMessageOverScanning() - { - TestSubscriptionSet ss = new TestSubscriptionSet(); - TestSubscription sub = new TestSubscription("test"); - ss.addSubscriber(suspendedSub1); - ss.addSubscriber(sub); - ss.addSubscriber(suspendedSub3); - assertEquals(sub, ss.nextSubscriber(null)); - assertEquals(2, ss.getCurrentSubscriber()); - assertEquals(2, ss.getScanned()); - - ss.resetScanned(); - sub.setSuspended(true); - assertNull(ss.nextSubscriber(null)); - assertEquals(3, ss.getCurrentSubscriber()); - // Current implementation overscans by one item here. - assertEquals(ss.size() + 1, ss.getScanned()); - } - - @Test - public void nextMessageOverscanWorstCase() { - TestSubscriptionSet ss = createAllSuspendedSubscriptionSet(); - ss.nextSubscriber(null); - // Scans the subscriptions twice. - assertEquals(ss.size() * 2, ss.getScanned()); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(SubscriptionSetTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java deleted file mode 100644 index faa6bdcbe7..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.queue.SynchronizedDeliveryManager; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; -import org.apache.qpid.server.queue.DeliveryManagerTest; -import org.apache.qpid.AMQException; -import junit.framework.JUnit4TestAdapter; - -public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest -{ - public SynchronizedDeliveryManagerTest() throws Exception - { - try - { - System.setProperty("concurrentdeliverymanager","false"); - _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, - new DefaultQueueRegistry())); - } - catch (Throwable t) - { - t.printStackTrace(); - throw new AMQException("Could not initialise delivery manager", t); - } - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(SynchronizedDeliveryManagerTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/org/apache/qpid/server/queue/TestSubscription.java deleted file mode 100644 index de6df4bc03..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/TestSubscription.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import java.util.ArrayList; -import java.util.List; - -public class TestSubscription implements Subscription -{ - private final List messages; - private final Object key; - private boolean isSuspended; - - public TestSubscription(Object key) - { - this(key, new ArrayList()); - } - - public TestSubscription(final Object key, final boolean isSuspended) - { - this(key); - setSuspended(isSuspended); - } - - TestSubscription(Object key, List messages) - { - this.key = key; - this.messages = messages; - } - - List getMessages() - { - return messages; - } - - public void send(AMQMessage msg, AMQQueue queue) - { - messages.add(msg); - } - - public void setSuspended(boolean suspended) - { - isSuspended = suspended; - } - - public boolean isSuspended() - { - return isSuspended; - } - - public void queueDeleted(AMQQueue queue) - { - } - - public int hashCode() - { - return key.hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key); - } - - public String toString() - { - return key.toString(); - } -} diff --git a/java/systests/src/org/apache/qpid/server/queue/UnitTests.java b/java/systests/src/org/apache/qpid/server/queue/UnitTests.java deleted file mode 100644 index fbdcaf7467..0000000000 --- a/java/systests/src/org/apache/qpid/server/queue/UnitTests.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - AckTest.class, - DeliveryManagerTest.class, - SubscriptionManagerTest.class, - SubscriptionSetTest.class, - ConcurrencyTest.class} -) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/org/apache/qpid/server/store/SkeletonMessageStore.java deleted file mode 100644 index bc0a8a7d64..0000000000 --- a/java/systests/src/org/apache/qpid/server/store/SkeletonMessageStore.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.AMQException; -import org.apache.commons.configuration.Configuration; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A message store that does nothing. Designed to be used in tests that do not want to use any message store - * functionality. - */ -public class SkeletonMessageStore implements MessageStore -{ - private final AtomicLong _messageId = new AtomicLong(1); - - public void configure(String base, Configuration config) throws Exception - { - } - - public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception - { - } - - public void close() throws Exception - { - } - - public void put(AMQMessage msg) - { - } - - public void removeMessage(long messageId) - { - } - - public void createQueue(AMQQueue queue) throws AMQException - { - } - - public void removeQueue(String name) throws AMQException - { - } - - public void enqueueMessage(String name, long messageId) throws AMQException - { - } - - public void dequeueMessage(String name, long messageId) throws AMQException - { - } - - public void beginTran() throws AMQException - { - } - - public boolean inTran() - { - return false; - } - - public void commitTran() throws AMQException - { - } - - public void abortTran() throws AMQException - { - } - - public List createQueues() throws AMQException - { - return null; - } - - public long getNewMessageId() - { - return _messageId.getAndIncrement(); - } -} diff --git a/java/systests/src/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/org/apache/qpid/server/store/TestReferenceCounting.java deleted file mode 100644 index 7431e6e4cc..0000000000 --- a/java/systests/src/org/apache/qpid/server/store/TestReferenceCounting.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import junit.framework.JUnit4TestAdapter; -import org.junit.Test; -import org.junit.Assert; -import org.junit.Before; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.AMQException; - -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ -public class TestReferenceCounting -{ - private TestableMemoryMessageStore _store; - - @Before - public void createCommonObjects() - { - _store = new TestableMemoryMessageStore(); - } - - /** - * Check that when the reference count is decremented the message removes itself from the store - */ - @Test - public void testMessageGetsRemoved() throws AMQException - { - AMQMessage message = new AMQMessage(_store, null); - _store.put(message); - Assert.assertTrue(_store.getMessageMap().size() == 1); - message.decrementReference(); - Assert.assertTrue(_store.getMessageMap().size() == 0); - } - - @Test - public void testMessageRemains() throws AMQException - { - AMQMessage message = new AMQMessage(_store, null); - _store.put(message); - Assert.assertTrue(_store.getMessageMap().size() == 1); - message.incrementReference(); - message.decrementReference(); - Assert.assertTrue(_store.getMessageMap().size() == 1); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(TestReferenceCounting.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java deleted file mode 100644 index be7687a22c..0000000000 --- a/java/systests/src/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.queue.AMQMessage; - -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestableMemoryMessageStore extends MemoryMessageStore -{ - public TestableMemoryMessageStore() - { - _messageMap = new ConcurrentHashMap(); - } - - public ConcurrentMap getMessageMap() - { - return _messageMap; - } -} diff --git a/java/systests/src/org/apache/qpid/server/store/UnitTests.java b/java/systests/src/org/apache/qpid/server/store/UnitTests.java deleted file mode 100644 index 8e11f823a9..0000000000 --- a/java/systests/src/org/apache/qpid/server/store/UnitTests.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - TestReferenceCounting.class -}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/org/apache/qpid/server/txn/TxnBufferTest.java deleted file mode 100644 index f1b14b69b8..0000000000 --- a/java/systests/src/org/apache/qpid/server/txn/TxnBufferTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.txn; - -import junit.framework.JUnit4TestAdapter; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.Ignore; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; - -import java.util.LinkedList; - -public class TxnBufferTest -{ - private final LinkedList ops = new LinkedList(); - - @Before - public void setup() throws Exception - { - } - - @Test - public void commit() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(store); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - //check relative ordering - MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); - buffer.enlist(op); - buffer.enlist(op); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - - buffer.commit(); - - validateOps(); - store.validate(); - } - - @Test - public void rollback() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(store); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - - buffer.rollback(); - - validateOps(); - store.validate(); - } - - @Test - public void commitWithFailureDuringPrepare() throws AMQException - { - MockStore store = new MockStore(); - store.expectBegin().expectAbort(); - - TxnBuffer buffer = new TxnBuffer(store); - buffer.containsPersistentChanges(); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new TxnTester(store)); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new FailedPrepare()); - buffer.enlist(new MockOp()); - - buffer.commit(); - validateOps(); - store.validate(); - } - - @Test - public void commitWithPersistance() throws AMQException - { - MockStore store = new MockStore(); - store.expectBegin().expectCommit(); - - TxnBuffer buffer = new TxnBuffer(store); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new TxnTester(store)); - buffer.containsPersistentChanges(); - - buffer.commit(); - validateOps(); - store.validate(); - } - - private void validateOps() - { - for(MockOp op : ops) - { - op.validate(); - } - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(TxnBufferTest.class); - } - - class MockOp implements TxnOp - { - final Object PREPARE = "PREPARE"; - final Object COMMIT = "COMMIT"; - final Object UNDO_PREPARE = "UNDO_PREPARE"; - final Object ROLLBACK = "ROLLBACK"; - - private final LinkedList expected = new LinkedList(); - - MockOp() - { - ops.add(this); - } - - public void prepare() - { - assertEquals(expected.removeLast(), PREPARE); - } - - public void commit() - { - assertEquals(expected.removeLast(), COMMIT); - } - - public void undoPrepare() - { - assertEquals(expected.removeLast(), UNDO_PREPARE); - } - - public void rollback() - { - assertEquals(expected.removeLast(), ROLLBACK); - } - - private MockOp expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockOp expectPrepare() - { - return expect(PREPARE); - } - - MockOp expectCommit() - { - return expect(COMMIT); - } - - MockOp expectUndoPrepare() - { - return expect(UNDO_PREPARE); - } - - MockOp expectRollback() - { - return expect(ROLLBACK); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - - void clear() - { - expected.clear(); - } - } - - class MockStore extends TestableMemoryMessageStore - { - final Object BEGIN = "BEGIN"; - final Object ABORT = "ABORT"; - final Object COMMIT = "COMMIT"; - - private final LinkedList expected = new LinkedList(); - private boolean inTran; - - public void beginTran() throws AMQException - { - assertEquals(expected.removeLast(), BEGIN); - inTran = true; - } - - public void commitTran() throws AMQException - { - assertEquals(expected.removeLast(), COMMIT); - inTran = false; - } - - public void abortTran() throws AMQException - { - assertEquals(expected.removeLast(), ABORT); - inTran = false; - } - - public boolean inTran() - { - return inTran; - } - - private MockStore expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockStore expectBegin() - { - return expect(BEGIN); - } - - MockStore expectCommit() - { - return expect(COMMIT); - } - - MockStore expectAbort() - { - return expect(ABORT); - } - - void clear() - { - expected.clear(); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - } - - class NullOp implements TxnOp - { - public void prepare() throws AMQException - { - } - public void commit() - { - } - public void undoPrepare() - { - } - public void rollback() - { - } - } - - class FailedPrepare extends NullOp - { - public void prepare() throws AMQException - { - throw new AMQException("Fail!"); - } - } - - class TxnTester extends NullOp - { - private final MessageStore store; - - TxnTester(MessageStore store) - { - this.store = store; - } - - public void prepare() throws AMQException - { - assertTrue("Expected prepare to be performed under txn", store.inTran()); - } - - public void commit() - { - assertTrue("Expected commit not to be performed under txn", !store.inTran()); - } - } - -} diff --git a/java/systests/src/org/apache/qpid/server/txn/UnitTests.java b/java/systests/src/org/apache/qpid/server/txn/UnitTests.java deleted file mode 100644 index 975e8eea72..0000000000 --- a/java/systests/src/org/apache/qpid/server/txn/UnitTests.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.txn; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({ - TxnBufferTest.class -}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/org/apache/qpid/server/util/AveragedRun.java deleted file mode 100644 index 1d17985ab5..0000000000 --- a/java/systests/src/org/apache/qpid/server/util/AveragedRun.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -import org.apache.qpid.server.util.TimedRun; - -import java.util.concurrent.Callable; -import java.util.Collection; - -public class AveragedRun implements Callable -{ - private final RunStats stats = new RunStats(); - private final TimedRun test; - private final int iterations; - - public AveragedRun(TimedRun test, int iterations) - { - this.test = test; - this.iterations = iterations; - } - - public RunStats call() throws Exception - { - for (int i = 0; i < iterations; i++) - { - stats.record(test.call()); - } - return stats; - } - - public void run() throws Exception - { - System.out.println(test + ": " + call()); - } - - public String toString() - { - return test.toString(); - } - - static void run(Collection tests) throws Exception - { - for(AveragedRun test : tests) - { - test.run(); - } - } -} diff --git a/java/systests/src/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/org/apache/qpid/server/util/ConcurrentTest.java deleted file mode 100644 index 1ae8d3205d..0000000000 --- a/java/systests/src/org/apache/qpid/server/util/ConcurrentTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -public class ConcurrentTest extends TimedRun -{ - private final TimedRun _test; - private final Thread[] _threads; - - public ConcurrentTest(TimedRun test, int threads) - { - super(test.toString()); - _test = test; - _threads = new Thread[threads]; - } - - protected void setup() throws Exception - { - _test.setup(); - for(int i = 0; i < _threads.length; i++) - { - _threads[i] = new Thread(new Runner()); - } - } - - protected void teardown() throws Exception - { - _test.teardown(); - } - - protected void run() throws Exception - { - for(Thread t : _threads) - { - t.start(); - } - for(Thread t : _threads) - { - t.join(); - } - } - - private class Runner implements Runnable - { - private Exception error; - - public void run() - { - try - { - _test.run(); - } - catch(Exception e) - { - error = e; - e.printStackTrace(); - } - } - } - -} diff --git a/java/systests/src/org/apache/qpid/server/util/RunStats.java b/java/systests/src/org/apache/qpid/server/util/RunStats.java deleted file mode 100644 index ec67fc68b3..0000000000 --- a/java/systests/src/org/apache/qpid/server/util/RunStats.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -public class RunStats -{ - private long min = Long.MAX_VALUE; - private long max; - private long total; - private int count; - - public void record(long time) - { - max = Math.max(time, max); - min = Math.min(time, min); - total += time; - count++; - } - - public long getMin() - { - return min; - } - - public long getMax() - { - return max; - } - - public long getAverage() - { - return total / count; - } - - public String toString() - { - return "avg=" + getAverage() + ", min=" + min + ", max=" + max; - } -} diff --git a/java/systests/src/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/org/apache/qpid/server/util/TestApplicationRegistry.java deleted file mode 100644 index f801daf27c..0000000000 --- a/java/systests/src/org/apache/qpid/server/util/TestApplicationRegistry.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.ManagedObjectRegistry; -import org.apache.qpid.server.management.NoopManagedObjectRegistry; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.AuthenticationManager; -import org.apache.qpid.server.security.auth.NullAuthenticationManager; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.MapConfiguration; - -import java.util.HashMap; - -public class TestApplicationRegistry extends ApplicationRegistry -{ - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - - private ManagedObjectRegistry _managedObjectRegistry; - - private AuthenticationManager _authenticationManager; - - private MessageStore _messageStore; - - public TestApplicationRegistry() - { - super(new MapConfiguration(new HashMap())); - } - - public void initialise() throws Exception - { - _managedObjectRegistry = new NoopManagedObjectRegistry(); - _queueRegistry = new DefaultQueueRegistry(); - _exchangeFactory = new DefaultExchangeFactory(); - _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); - _authenticationManager = new NullAuthenticationManager(); - _messageStore = new TestableMemoryMessageStore(); - - _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes - } - - public Configuration getConfiguration() - { - return _configuration; - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public ManagedObjectRegistry getManagedObjectRegistry() - { - return _managedObjectRegistry; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - - public MessageStore getMessageStore() - { - return _messageStore; - } -} - diff --git a/java/systests/src/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/org/apache/qpid/server/util/TimedRun.java deleted file mode 100644 index 1291380311..0000000000 --- a/java/systests/src/org/apache/qpid/server/util/TimedRun.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -import java.util.concurrent.Callable; - -public abstract class TimedRun implements Callable -{ - private final String description; - - public TimedRun(String description) - { - this.description = description; - } - - public Long call() throws Exception - { - setup(); - long start = System.currentTimeMillis(); - run(); - long stop = System.currentTimeMillis(); - teardown(); - return stop - start; - } - - public String toString() - { - return description; - } - - protected void setup() throws Exception{} - protected void teardown() throws Exception{} - protected abstract void run() throws Exception; -} diff --git a/java/systests/src/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java deleted file mode 100644 index 53d83fb6a3..0000000000 --- a/java/systests/src/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import junit.framework.JUnit4TestAdapter; -import org.apache.log4j.Logger; -import org.apache.log4j.xml.DOMConfigurator; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import javax.jms.*; - -public class DisconnectAndRedeliverTest -{ - private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); - - - static - { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - //DOMConfigurator.configure("../etc/log4j.xml"); - DOMConfigurator.configure("broker/etc/log4j.xml"); - } - - @Before - public void resetAppliactionRegistry() throws Exception - { - createVMBroker(); - ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); - } - - - public void createVMBroker() - { - try - { - TransportConnection.createVMBroker(1); - } - catch (AMQVMBrokerCreationException e) - { - Assert.fail("Unable to create broker: " + e); - } - } - - @After - public void stopVmBroker() - { - TransportConnection.killVMBroker(1); - } - - /** - * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when - * the channel is closed. - * - * @throws Exception - */ - @Test - public void disconnectRedeliversMessages() throws Exception - { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - - Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - - - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receive(3000); - Assert.assertEquals("msg2", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - Assert.assertEquals("msg3", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - Assert.assertEquals("msg4", tm.getText()); - - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); - - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting third consumer connection"); - con.start(); - tm = (TextMessage) consumer.receiveNoWait(); - Assert.assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting fourth consumer connection"); - con.start(); - tm = (TextMessage) consumer.receive(3000); - Assert.assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - _logger.info("Actually:" + store.getMessageMap().size()); - // Assert.assertTrue(store.getMessageMap().size() == 0); - } - - /** - * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be - * requeued (due perhaps to the queue being deleted). - * - * @throws Exception - */ - @Test - public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception - { - - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receiveNoWait(); - Assert.assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - _logger.info("Actually:" + store.getMessageMap().size()); - Assert.assertTrue(store.getMessageMap().size() == 0); - con.close(); - } - - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(DisconnectAndRedeliverTest.class); - } -} diff --git a/java/systests/src/org/apache/qpid/test/unit/ack/UnitTests.java b/java/systests/src/org/apache/qpid/test/unit/ack/UnitTests.java deleted file mode 100644 index 498667e2bc..0000000000 --- a/java/systests/src/org/apache/qpid/test/unit/ack/UnitTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import junit.framework.JUnit4TestAdapter; -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - -@RunWith(Suite.class) -@Suite.SuiteClasses({DisconnectAndRedeliverTest.class}) -public class UnitTests -{ - public static junit.framework.Test suite() - { - return new JUnit4TestAdapter(UnitTests.class); - } -} diff --git a/java/systests/src/test/java/org/apache/qpid/server/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/UnitTests.java new file mode 100644 index 0000000000..1f4af59ce7 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/UnitTests.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + org.apache.qpid.server.ack.UnitTests.class, + org.apache.qpid.server.exchange.UnitTests.class, + org.apache.qpid.server.protocol.UnitTests.class, + org.apache.qpid.server.queue.UnitTests.class, + org.apache.qpid.server.store.UnitTests.class, + org.apache.qpid.server.txn.UnitTests.class + }) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java new file mode 100644 index 0000000000..fe2c189c39 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -0,0 +1,189 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.JUnit4TestAdapter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.store.TestableMemoryMessageStore; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; + +public class TxAckTest +{ + private Scenario individual; + private Scenario multiple; + private Scenario combined; + + @Before + public void setup() throws Exception + { + //ack only 5th msg + individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); + individual.update(5, false); + + //ack all up to and including 5th msg + multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); + multiple.update(5, true); + + //leave only 8th and 9th unacked + combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); + combined.update(3, false); + combined.update(5, true); + combined.update(7, true); + combined.update(2, true);//should be ignored + combined.update(1, false);//should be ignored + combined.update(10, false); + } + + @Test + public void prepare() throws AMQException + { + individual.prepare(); + multiple.prepare(); + combined.prepare(); + } + + @Test + public void undoPrepare() throws AMQException + { + individual.undoPrepare(); + multiple.undoPrepare(); + combined.undoPrepare(); + } + + @Test + public void commit() throws AMQException + { + individual.commit(); + multiple.commit(); + combined.commit(); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(TxAckTest.class); + } + + private class Scenario + { + private final LinkedHashMap _messages = new LinkedHashMap(); + private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(_messages, _messages); + private final TxAck _op = new TxAck(_map); + private final List _acked; + private final List _unacked; + + Scenario(int messageCount, List acked, List unacked) + { + for(int i = 0; i < messageCount; i++) + { + long deliveryTag = i + 1; + _messages.put(deliveryTag, new UnacknowledgedMessage(null, new TestMessage(deliveryTag), null, deliveryTag)); + } + _acked = acked; + _unacked = unacked; + } + + void update(long deliverytag, boolean multiple) + { + _op.update(deliverytag, multiple); + } + + private void assertCount(List tags, int expected) + { + for(long tag : tags) + { + UnacknowledgedMessage u = _messages.get(tag); + assertTrue("Message not found for tag " + tag, u != null); + ((TestMessage) u.message).assertCountEquals(expected); + } + } + + void prepare() throws AMQException + { + _op.consolidate(); + _op.prepare(); + + assertCount(_acked, -1); + assertCount(_unacked, 0); + + } + void undoPrepare() + { + _op.consolidate(); + _op.undoPrepare(); + + assertCount(_acked, 1); + assertCount(_unacked, 0); + } + + void commit() + { + _op.consolidate(); + _op.commit(); + + + //check acked messages are removed from map + HashSet keys = new HashSet(_messages.keySet()); + keys.retainAll(_acked); + assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); + //check unacked messages are still in map + keys = new HashSet(_unacked); + keys.removeAll(_messages.keySet()); + assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); + } + } + + private class TestMessage extends AMQMessage + { + private final long _tag; + private int _count; + + TestMessage(long tag) + { + super(new TestableMemoryMessageStore(), null); + _tag = tag; + } + + public void incrementReference() + { + _count++; + } + + public void decrementReference() + { + _count--; + } + + void assertCountEquals(int expected) + { + assertEquals("Wrong count for message with tag " + _tag, expected, _count); + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/ack/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/ack/UnitTests.java new file mode 100644 index 0000000000..45ffe1fd6c --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/ack/UnitTests.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.ack; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TxAckTest.class +}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java new file mode 100644 index 0000000000..1bb1d12e1c --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTest.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.AMQException; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Set; +import java.util.HashSet; + +public class AbstractHeadersExchangeTest +{ + private final HeadersExchange exchange = new HeadersExchange(); + protected final Set queues = new HashSet(); + private int count; + + protected TestQueue bindDefault(String... bindings) throws AMQException + { + return bind("Queue" + (++count), bindings); + } + + protected TestQueue bind(String queueName, String... bindings) throws AMQException + { + return bind(queueName, getHeaders(bindings)); + } + + protected TestQueue bind(String queue, FieldTable bindings) throws AMQException + { + return bind(new TestQueue(queue), bindings); + } + + protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException + { + return bind(queue, getHeaders(bindings)); + } + + protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException + { + queues.add(queue); + exchange.registerQueue(null, queue, bindings); + return queue; + } + + + protected void route(Message m) throws AMQException + { + m.route(exchange); + } + + protected void routeAndTest(Message m, TestQueue... expected) throws AMQException + { + routeAndTest(m, Arrays.asList(expected)); + } + + protected void routeAndTest(Message m, List expected) throws AMQException + { + route(m); + for (TestQueue q : queues) + { + if (expected.contains(q)) + { + assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; + } + else + { + assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q)); + //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; + } + } + } + + static FieldTable getHeaders(String... entries) + { + FieldTable headers = new FieldTable(); + for (String s : entries) + { + String[] parts = s.split("=", 2); + headers.put(parts[0], parts.length > 1 ? parts[1] : ""); + } + return headers; + } + + static BasicPublishBody getPublishRequest(String id) + { + BasicPublishBody request = new BasicPublishBody(); + request.routingKey = id; + return request; + } + + static ContentHeaderBody getContentHeader(FieldTable headers) + { + ContentHeaderBody header = new ContentHeaderBody(); + header.properties = getProperties(headers); + return header; + } + + static BasicContentHeaderProperties getProperties(FieldTable headers) + { + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + properties.setHeaders(headers); + return properties; + } + + static class TestQueue extends AMQQueue + { + final List messages = new ArrayList(); + + public TestQueue(String name) throws AMQException + { + super(name, false, "test", true, ApplicationRegistry.getInstance().getQueueRegistry()); + } + + public void deliver(AMQMessage msg) throws AMQException + { + messages.add(new HeadersExchangeTest.Message(msg)); + } + } + + /** + * Just add some extra utility methods to AMQMessage to aid testing. + */ + static class Message extends AMQMessage + { + private static MessageStore _messageStore = new SkeletonMessageStore(); + + Message(String id, String... headers) throws AMQException + { + this(id, getHeaders(headers)); + } + + Message(String id, FieldTable headers) throws AMQException + { + this(getPublishRequest(id), getContentHeader(headers), null); + } + + private Message(BasicPublishBody publish, ContentHeaderBody header, List bodies) throws AMQException + { + super(_messageStore, publish, header, bodies); + } + + private Message(AMQMessage msg) throws AMQException + { + super(msg); + } + + void route(Exchange exchange) throws AMQException + { + exchange.route(this); + } + + boolean isInQueue(TestQueue queue) + { + return queue.messages.contains(this); + } + + public int hashCode() + { + return getKey().hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); + } + + private boolean equals(HeadersExchangeTest.Message m) + { + return getKey().equals(m.getKey()); + } + + public String toString() + { + return getKey().toString(); + } + + private Object getKey() + { + return getPublishBody().routingKey; + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java new file mode 100644 index 0000000000..ff0d58ad69 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java @@ -0,0 +1,184 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.NoConsumersException; +import org.apache.qpid.server.util.TimedRun; +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.ContentBody; + +import java.util.List; + +/** + * Want to vary the number of regsitrations, messages and matches and measure + * the corresponding variance in execution time. + *

+ * Each registration will contain the 'All' header, even registrations will + * contain the 'Even' header and odd headers will contain the 'Odd' header. + * In additions each regsitration will have a unique value for the 'Specific' + * header as well. + *

+ * Messages can then be routed to all registrations, to even- or odd- registrations + * or to a specific registration. + * + */ +public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest +{ + private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} + + private final TestQueue[] queues; + private final Mode mode; + + public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException + { + this.mode = mode; + queues = new TestQueue[registrations]; + for (int i = 0; i < queues.length; i++) + { + switch(mode) + { + case ALL: + queues[i] = bind(new FastQueue("Queue" + i), "All"); + break; + case ODD_OR_EVEN: + queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); + break; + case SPECIFIC: + queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); + break; + } + } + } + + void sendToAll(int count) throws AMQException + { + send(count, "All=True"); + } + + void sendToOdd(int count) throws AMQException + { + send(count, "All=True", "Odd=True"); + } + + void sendToEven(int count) throws AMQException + { + send(count, "All=True", "Even=True"); + } + + void sendToAllSpecifically(int count) throws AMQException + { + for (int i = 0; i < queues.length; i++) + { + sendToSpecific(count, i); + } + } + + void sendToSpecific(int count, int index) throws AMQException + { + send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); + } + + private void send(int count, String... headers) throws AMQException + { + for (int i = 0; i < count; i++) + { + route(new Message("Message" + i, headers)); + } + } + + private static String oddOrEven(int i) + { + return (i % 2 == 0 ? "Even" : "Odd"); + } + + static class FastQueue extends TestQueue + { + + public FastQueue(String name) throws AMQException + { + super(name); + } + + public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List contentBodies) throws NoConsumersException + { + //just discard as we are not testing routing functionality here + } + } + + static class Test extends TimedRun + { + private final Mode mode; + private final int registrations; + private final int count; + private HeadersExchangePerformanceTest test; + + Test(Mode mode, int registrations, int count) + { + super(mode + ", registrations=" + registrations + ", count=" + count); + this.mode = mode; + this.registrations = registrations; + this.count = count; + } + + protected void setup() throws Exception + { + test = new HeadersExchangePerformanceTest(mode, registrations); + run(100); //do a warm up run before times start + } + + protected void teardown() throws Exception + { + test = null; + System.gc(); + } + + protected void run() throws Exception + { + run(count); + } + + private void run(int count) throws Exception + { + switch(mode) + { + case ALL: + test.sendToAll(count); + break; + default: + System.out.println("Test for " + mode + " not yet implemented."); + } + } + } + + public static void main(String[] argv) throws Exception + { + int registrations = Integer.parseInt(argv[0]); + int messages = Integer.parseInt(argv[1]); + int iterations = Integer.parseInt(argv[2]); + TimedRun test = new Test(Mode.ALL, registrations, messages); + AveragedRun tests = new AveragedRun(test, iterations); + System.out.println(tests.call()); + } +} + diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java new file mode 100644 index 0000000000..7d80a6dce7 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.junit.Test; +import org.junit.Before; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import junit.framework.JUnit4TestAdapter; + +public class HeadersExchangeTest extends AbstractHeadersExchangeTest +{ + @Before + public void init() throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + } + + @Test + public void simple() throws AMQException + { + TestQueue q1 = bindDefault("F0000"); + TestQueue q2 = bindDefault("F0000=Aardvark"); + TestQueue q3 = bindDefault("F0001"); + TestQueue q4 = bindDefault("F0001=Bear"); + TestQueue q5 = bindDefault("F0000", "F0001"); + TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); + TestQueue q7 = bindDefault("F0000", "F0001=Bear"); + TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); + TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana"); + TestQueue q10 = bindDefault("F0000=Apple", "F0001"); + + routeAndTest(new Message("Message1", "F0000"), q1); + routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); + routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); + routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); + routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), + q1, q2, q3, q4, q5, q6, q7, q8); + routeAndTest(new Message("Message6", "F0002")); + } + + @Test + public void any() throws AMQException + { + TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); + TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); + TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); + TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); + TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any"); + TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); + + routeAndTest(new Message("Message1", "F0000"), q1, q3); + routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); + routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); + routeAndTest(new Message("Message6", "F0002")); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(HeadersExchangeTest.class); + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/exchange/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/exchange/UnitTests.java new file mode 100644 index 0000000000..9ad3b65008 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/exchange/UnitTests.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({HeadersExchangeTest.class}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java new file mode 100644 index 0000000000..28047832b8 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultCloseFuture; +import org.apache.mina.common.support.DefaultWriteFuture; + +import java.net.SocketAddress; +import java.util.Set; + +public class MockIoSession implements IoSession +{ + private AMQProtocolSession _protocolSession; + + /** + * Stores the last response written + */ + private Object _lastWrittenObject; + + private boolean _closing; + + public MockIoSession() + { + } + + public Object getLastWrittenObject() + { + return _lastWrittenObject; + } + + public IoService getService() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoHandler getHandler() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoSessionConfig getConfig() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public IoFilterChain getFilterChain() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public WriteFuture write(Object message) + { + WriteFuture wf = new DefaultWriteFuture(null); + _lastWrittenObject = message; + return wf; + } + + public CloseFuture close() + { + _closing = true; + CloseFuture cf = new DefaultCloseFuture(null); + cf.setClosed(); + return cf; + } + + public Object getAttachment() + { + return _protocolSession; + } + + public Object setAttachment(Object attachment) + { + Object current = _protocolSession; + _protocolSession = (AMQProtocolSession) attachment; + return current; + } + + public Object getAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key, Object value) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object setAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public Object removeAttribute(String key) + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean containsAttribute(String key) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Set getAttributeKeys() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public TransportType getTransportType() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isConnected() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isClosing() + { + return _closing; + } + + public CloseFuture getCloseFuture() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getRemoteAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getLocalAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public SocketAddress getServiceAddress() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public int getWriteTimeout() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWriteTimeoutInMillis() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setWriteTimeout(int writeTimeout) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public TrafficMask getTrafficMask() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setTrafficMask(TrafficMask trafficMask) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void suspendWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeRead() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void resumeWrite() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getWrittenBytes() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getReadMessages() + { + return 0L; + } + + public long getWrittenMessages() + { + return 0L; + } + + public long getWrittenWriteRequests() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteMessages() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getScheduledWriteBytes() + { + return 0; //TODO + } + + public long getCreationTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIoTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastReadTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastWriteTime() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isIdle(IdleStatus status) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int getIdleCount(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java new file mode 100644 index 0000000000..847c3cc51b --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java @@ -0,0 +1,215 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.Assert; +import junit.framework.JUnit4TestAdapter; +import org.apache.qpid.codec.AMQDecoder; +import org.apache.qpid.codec.AMQEncoder; +import org.apache.qpid.framing.*; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.mina.filter.codec.ProtocolEncoderOutput; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; +import org.junit.Before; +import org.junit.Test; + +/** + * This test suite tests the handling of protocol initiation frames and related issues. + */ +public class TestProtocolInitiation implements ProtocolVersionList +{ + private AMQPFastProtocolHandler _protocolHandler; + + private MockIoSession _mockIoSession; + + /** + * We need to use the object encoder mechanism so to allow us to retrieve the + * output (a bytebuffer) we define our own encoder output class. The encoder + * writes the encoded data to this class, from where we can retrieve it during + * the test run. + */ + private class TestProtocolEncoderOutput implements ProtocolEncoderOutput + { + public ByteBuffer result; + + public void write(ByteBuffer buf) + { + result = buf; + } + + public void mergeAll() + { + throw new UnsupportedOperationException(); + } + + public WriteFuture flush() + { + throw new UnsupportedOperationException(); + } + } + + private class TestProtocolDecoderOutput implements ProtocolDecoderOutput + { + public Object result; + + public void write(Object buf) + { + result = buf; + } + + public void flush() + { + throw new UnsupportedOperationException(); + } + } + + @Before + public void createCommonObjects() + { + _mockIoSession = new MockIoSession(); + _protocolHandler = new AMQPFastProtocolHandler(null, null); + } + + + /** + * Tests that the AMQDecoder handles invalid protocol classes + * @throws Exception + */ + @Test(expected = AMQProtocolClassException.class) + public void testDecoderValidateProtocolClass() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolClass = 2; + decodePI(pi); + } + + /** + * Tests that the AMQDecoder handles invalid protocol instance numbers + * @throws Exception + */ + @Test(expected = AMQProtocolInstanceException.class) + public void testDecoderValidatesProtocolInstance() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolInstance = 2; + decodePI(pi); + } + + /** + * Tests that the AMQDecoder handles invalid protocol major + * @throws Exception + */ + @Test(expected = AMQProtocolVersionException.class) + public void testDecoderValidatesProtocolMajor() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolMajor = 2; + decodePI(pi); + } + + /** + * Tests that the AMQDecoder handles invalid protocol minor + * @throws Exception + */ + @Test(expected = AMQProtocolVersionException.class) + public void testDecoderValidatesProtocolMinor() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.protocolMinor = 99; + decodePI(pi); + } + + /** + * Tests that the AMQDecoder accepts a valid PI + * @throws Exception + */ + @Test(expected = AMQProtocolHeaderException.class) + public void testDecoderValidatesHeader() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + pi.header = new char[] {'P', 'Q', 'M', 'A' }; + decodePI(pi); + } + + /** + * Test that a valid header is passed by the decoder. + * @throws Exception + */ + @Test + public void testDecoderAcceptsValidHeader() throws Exception + { + ProtocolInitiation pi = createValidProtocolInitiation(); + decodePI(pi); + } + + /** + * This test checks that an invalid protocol header results in the + * connection being closed. + */ + @Test + public void testInvalidProtocolHeaderClosesConnection() throws Exception + { + AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); + _protocolHandler.exceptionCaught(_mockIoSession, pe); + Assert.assertNotNull(_mockIoSession.getLastWrittenObject()); + Object piResponse = _mockIoSession.getLastWrittenObject(); + Assert.assertEquals(piResponse.getClass(), ProtocolInitiation.class); + ProtocolInitiation pi = (ProtocolInitiation) piResponse; + Assert.assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, + createValidProtocolInitiation()); + Assert.assertTrue("Session has not been closed", _mockIoSession.isClosing()); + } + + private ProtocolInitiation createValidProtocolInitiation() + { + /* Find last protocol version in protocol version list. Make sure last protocol version + listed in the build file (build-module.xml) is the latest version which will be used + here. */ + int i = pv.length - 1; + return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); + } + + /** + * Helper that encodes a protocol initiation and attempts to decode it + * @param pi + * @throws Exception + */ + private void decodePI(ProtocolInitiation pi) throws Exception + { + // we need to do this test at the level of the decoder since we initially only expect PI frames + // so the protocol handler is not set up to know whether it should be expecting a PI frame or + // a different type of frame + AMQDecoder decoder = new AMQDecoder(true); + AMQEncoder encoder = new AMQEncoder(); + TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); + encoder.encode(_mockIoSession, pi, peo); + TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); + decoder.decode(_mockIoSession, peo.result, pdo); + ((ProtocolInitiation) pdo.result).checkVersion(this); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(TestProtocolInitiation.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/protocol/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/protocol/UnitTests.java new file mode 100644 index 0000000000..53038a4058 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/protocol/UnitTests.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({TestProtocolInitiation.class}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java new file mode 100644 index 0000000000..fe04bd43e1 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java @@ -0,0 +1,324 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.JUnit4TestAdapter; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ack.UnacknowledgedMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.Map; + +/** + * Tests that acknowledgements are handled correctly. + */ +public class AckTest +{ + private static final Logger _log = Logger.getLogger(AckTest.class); + + private SubscriptionImpl _subscription; + + private MockProtocolSession _protocolSession; + + private TestableMemoryMessageStore _messageStore; + + private AMQChannel _channel; + + private SubscriptionSet _subscriptionManager; + + private AMQQueue _queue; + + public AckTest() throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + } + + @Before + public void setup() throws Exception + { + _messageStore = new TestableMemoryMessageStore(); + _channel = new AMQChannel(5, _messageStore, null/*dont need exchange registry*/); + _protocolSession = new MockProtocolSession(_messageStore); + _protocolSession.addChannel(_channel); + _subscriptionManager = new SubscriptionSet(); + _queue = new AMQQueue("myQ", false, "guest", true, new DefaultQueueRegistry(), _subscriptionManager); + } + + private void publishMessages(int count) throws AMQException + { + publishMessages(count, false); + } + + private void publishMessages(int count, boolean persistent) throws AMQException + { + for (int i = 1; i <= count; i++) + { + BasicPublishBody publishBody = new BasicPublishBody(); + publishBody.routingKey = "rk"; + publishBody.exchange = "someExchange"; + AMQMessage msg = new AMQMessage(_messageStore, publishBody); + if (persistent) + { + BasicContentHeaderProperties b = new BasicContentHeaderProperties(); + //This is DeliveryMode.PERSISTENT + b.setDeliveryMode((byte) 2); + ContentHeaderBody cb = new ContentHeaderBody(); + cb.properties = b; + msg.setContentHeaderBody(cb); + } + else + { + msg.setContentHeaderBody(new ContentHeaderBody()); + } + _subscription.send(msg, _queue); + } + } + + /** + * Tests that the acknowledgements are correctly associated with a channel and + * order is preserved when acks are enabled + */ + @Test + public void ackChannelAssociationTest() throws AMQException + { + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + final int msgCount = 10; + publishMessages(msgCount, true); + + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMap().size() == msgCount); + + Iterator> it = map.entrySet().iterator(); + for (int i = 1; i <= map.size(); i++) + { + Map.Entry entry = it.next(); + assertTrue(entry.getKey() == i); + UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(unackedMsg.queue == _queue); + } + + assertTrue(map.size() == msgCount); + assertTrue(_messageStore.getMessageMap().size() == msgCount); + } + + /** + * Tests that in no-ack mode no messages are retained + */ + @Test + public void testNoAckMode() throws AMQException + { + // false arg means no acks expected + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", false); + final int msgCount = 10; + publishMessages(msgCount); + + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + assertTrue(_messageStore.getMessageMap().size() == 0); + } + + /** + * Tests that a single acknowledgement is handled correctly (i.e multiple flag not + * set case) + */ + @Test + public void singleAckReceivedTest() throws AMQException + { + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(5, false); + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == msgCount - 1); + + Iterator> it = map.entrySet().iterator(); + int i = 1; + while (i <= map.size()) + { + Map.Entry entry = it.next(); + assertTrue(entry.getKey() == i); + UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(unackedMsg.queue == _queue); + // 5 is the delivery tag of the message that *should* be removed + if (++i == 5) + { + ++i; + } + } + } + + /** + * Tests that a single acknowledgement is handled correctly (i.e multiple flag not + * set case) + */ + @Test + public void multiAckReceivedTest() throws AMQException + { + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(5, true); + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 5); + + Iterator> it = map.entrySet().iterator(); + int i = 1; + while (i <= map.size()) + { + Map.Entry entry = it.next(); + assertTrue(entry.getKey() == i + 5); + UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(unackedMsg.queue == _queue); + ++i; + } + } + + /** + * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. + */ + @Test + public void multiAckAllReceivedTest() throws AMQException + { + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + final int msgCount = 10; + publishMessages(msgCount); + + _channel.acknowledgeMessage(0, true); + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 0); + + Iterator> it = map.entrySet().iterator(); + int i = 1; + while (i <= map.size()) + { + Map.Entry entry = it.next(); + assertTrue(entry.getKey() == i + 5); + UnacknowledgedMessage unackedMsg = entry.getValue(); + assertTrue(unackedMsg.queue == _queue); + ++i; + } + } + + + @Test + public void testPrefetchHighLow() throws AMQException + { + int lowMark = 5; + int highMark = 10; + + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _channel.setPrefetchLowMarkCount(lowMark); + _channel.setPrefetchHighMarkCount(highMark); + + assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); + assertTrue(_channel.getPrefetchHighMarkCount() == highMark); + + publishMessages(highMark); + + // at this point we should have sent out only highMark messages + // which have not bee received so will be queued up in the channel + // which should be suspended + assertTrue(_subscription.isSuspended()); + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == highMark); + + //acknowledge messages so we are just above lowMark + _channel.acknowledgeMessage(lowMark - 1, true); + + //we should still be suspended + assertTrue(_subscription.isSuspended()); + assertTrue(map.size() == lowMark + 1); + + //acknowledge one more message + _channel.acknowledgeMessage(lowMark, true); + + //and suspension should be lifted + assertTrue(!_subscription.isSuspended()); + + //pubilsh more msgs so we are just below the limit + publishMessages(lowMark - 1); + + //we should not be suspended + assertTrue(!_subscription.isSuspended()); + + //acknowledge all messages + _channel.acknowledgeMessage(0, true); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + //map will be empty + assertTrue(map.size() == 0); + } + + @Test + public void testPrefetch() throws AMQException + { + _subscription = new SubscriptionImpl(5, _protocolSession, "conTag", true); + _channel.setPrefetchCount(5); + + assertTrue(_channel.getPrefetchCount() == 5); + + final int msgCount = 5; + publishMessages(msgCount); + + // at this point we should have sent out only 5 messages with a further 5 queued + // up in the channel which should now be suspended + assertTrue(_subscription.isSuspended()); + Map map = _channel.getUnacknowledgedMessageMap(); + assertTrue(map.size() == 5); + _channel.acknowledgeMessage(5, true); + assertTrue(!_subscription.isSuspended()); + try + { + Thread.sleep(3000); + } + catch (InterruptedException e) + { + _log.error("Error: " + e, e); + } + assertTrue(map.size() == 0); + } + + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(AckTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java new file mode 100644 index 0000000000..60d86a14a7 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java @@ -0,0 +1,264 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.JUnit4TestAdapter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import org.junit.Test; +import org.junit.Assert; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.handler.OnCurrentThreadExecutor; + +import java.util.*; +import java.util.concurrent.Executor; + +/** + * Tests delivery in the face of concurrent incoming _messages, subscription alterations + * and attempts to asynchronously process queued _messages. + */ +public class ConcurrencyTest extends MessageTestHelper +{ + private final Random random = new Random(); + + private final int numMessages = 1000; + + private final List _subscribers = new ArrayList(); + private final Set _active = new HashSet(); + private final List _messages = new ArrayList(); + private int next = 0;//index to next message to send + private final List _received = Collections.synchronizedList(new ArrayList()); + private final Executor _executor = new OnCurrentThreadExecutor(); + private final List _threads = new ArrayList(); + + private final SubscriptionSet _subscriptionMgr = new SubscriptionSet(); + private final DeliveryManager _deliveryMgr; + + private boolean isComplete; + private boolean failed; + + public ConcurrencyTest() throws Exception + { + _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false, + new DefaultQueueRegistry())); + } + + @Test + public void concurrent1() throws InterruptedException, AMQException + { + initSubscriptions(10); + initMessages(numMessages); + initThreads(1, 4, 4, 4); + run(); + check(); + } + + @Test + public void concurrent2() throws InterruptedException, AMQException + { + initSubscriptions(10); + initMessages(numMessages); + initThreads(4, 2, 2, 2); + run(); + check(); + } + + void check() + { + assertFalse("Failed", failed); + + _deliveryMgr.processAsync(_executor); + + assertEquals("Did not recieve the correct number of messages", _messages.size(), _received.size()); + for(int i = 0; i < _messages.size(); i++) + { + assertEquals("Wrong message at " + i, _messages.get(i), _received.get(i)); + } + } + + void initSubscriptions(int subscriptions) + { + for(int i = 0; i < subscriptions; i++) + { + _subscribers.add(new TestSubscription("Subscriber" + i, _received)); + } + } + + void initMessages(int messages) throws AMQException + { + for(int i = 0; i < messages; i++) + { + _messages.add(message()); + } + } + + void initThreads(int senders, int subscribers, int suspenders, int processors) + { + addThreads(senders, senders == 1 ? new Sender() : new OrderedSender()); + addThreads(subscribers, new Subscriber()); + addThreads(suspenders, new Suspender()); + addThreads(processors, new Processor()); + } + + void addThreads(int count, Runnable runner) + { + for(int i = 0; i < count; i++) + { + _threads.add(new Thread(runner, runner.toString())); + } + } + + void run() throws InterruptedException + { + for(Thread t : _threads) + { + t.start(); + } + + for(Thread t : _threads) + { + t.join(); + } + } + + private void toggle(Subscription s) + { + synchronized (_active) + { + if (_active.contains(s)) + { + _active.remove(s); + Subscription result = _subscriptionMgr.removeSubscriber(s); + Assert.assertTrue("Removed subscription " + result + " but trying to remove subscription " + s, + result != null && result.equals(s)); + } + else + { + _active.add(s); + _subscriptionMgr.addSubscriber(s); + } + } + } + + private AMQMessage nextMessage() + { + synchronized (_messages) + { + if (next < _messages.size()) + { + return _messages.get(next++); + } + else + { + if (!_deliveryMgr.hasQueuedMessages()) { + isComplete = true; + } + return null; + } + } + } + + private boolean randomBoolean() + { + return random.nextBoolean(); + } + + private TestSubscription randomSubscriber() + { + return _subscribers.get(random.nextInt(_subscribers.size())); + } + + private class Sender extends Runner + { + void doRun() throws Throwable + { + AMQMessage msg = nextMessage(); + if (msg != null) + { + _deliveryMgr.deliver(toString(), msg); + } + } + } + + private class OrderedSender extends Sender + { + synchronized void doRun() throws Throwable + { + super.doRun(); + } + } + + private class Suspender extends Runner + { + void doRun() throws Throwable + { + randomSubscriber().setSuspended(randomBoolean()); + } + } + + private class Subscriber extends Runner + { + void doRun() throws Throwable + { + toggle(randomSubscriber()); + } + } + + private class Processor extends Runner + { + void doRun() throws Throwable + { + _deliveryMgr.processAsync(_executor); + } + } + + private abstract class Runner implements Runnable + { + public void run() + { + try + { + while (!stop()) + { + doRun(); + } + } + catch (Throwable t) + { + failed = true; + t.printStackTrace(); + } + } + + abstract void doRun() throws Throwable; + + boolean stop() + { + return isComplete || failed; + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(ConcurrencyTest.class); + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java new file mode 100644 index 0000000000..9d841d86f5 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.queue.ConcurrentDeliveryManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.DeliveryManagerTest; +import org.apache.qpid.AMQException; +import junit.framework.JUnit4TestAdapter; + +public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest +{ + public ConcurrentDeliveryManagerTest() throws Exception + { + try + { + System.setProperty("concurrentdeliverymanager","true"); + _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, + new DefaultQueueRegistry())); + } + catch (Throwable t) + { + t.printStackTrace(); + throw new AMQException("Could not initialise delivery manager", t); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(ConcurrentDeliveryManagerTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java new file mode 100644 index 0000000000..a6739223b0 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import org.junit.runners.Suite; +import org.junit.runner.RunWith; +import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.AMQException; +import junit.framework.JUnit4TestAdapter; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + ConcurrentDeliveryManagerTest.class, + SynchronizedDeliveryManagerTest.class}) + +abstract public class DeliveryManagerTest extends MessageTestHelper +{ + protected final SubscriptionSet _subscriptions = new SubscriptionSet(); + protected DeliveryManager _mgr; + + public DeliveryManagerTest() throws Exception + { + } + + + @Test + public void startInQueueingMode() throws AMQException + { + AMQMessage[] messages = new AMQMessage[10]; + for (int i = 0; i < messages.length; i++) + { + messages[i] = message(); + } + int batch = messages.length / 2; + + for (int i = 0; i < batch; i++) + { + _mgr.deliver("Me", messages[i]); + } + + TestSubscription s1 = new TestSubscription("1"); + TestSubscription s2 = new TestSubscription("2"); + _subscriptions.addSubscriber(s1); + _subscriptions.addSubscriber(s2); + + for (int i = batch; i < messages.length; i++) + { + _mgr.deliver("Me", messages[i]); + } + + assertTrue(s1.getMessages().isEmpty()); + assertTrue(s2.getMessages().isEmpty()); + + _mgr.processAsync(new OnCurrentThreadExecutor()); + + assertEquals(messages.length / 2, s1.getMessages().size()); + assertEquals(messages.length / 2, s2.getMessages().size()); + + for (int i = 0; i < messages.length; i++) + { + if (i % 2 == 0) + { + assertTrue(s1.getMessages().get(i / 2) == messages[i]); + } + else + { + assertTrue(s2.getMessages().get(i / 2) == messages[i]); + } + } + } + + @Test + public void startInDirectMode() throws AMQException + { + AMQMessage[] messages = new AMQMessage[10]; + for (int i = 0; i < messages.length; i++) + { + messages[i] = message(); + } + int batch = messages.length / 2; + + TestSubscription s1 = new TestSubscription("1"); + _subscriptions.addSubscriber(s1); + + for (int i = 0; i < batch; i++) + { + _mgr.deliver("Me", messages[i]); + } + + assertEquals(batch, s1.getMessages().size()); + for (int i = 0; i < batch; i++) + { + assertTrue(messages[i] == s1.getMessages().get(i)); + } + s1.getMessages().clear(); + assertEquals(0, s1.getMessages().size()); + + s1.setSuspended(true); + for (int i = batch; i < messages.length; i++) + { + _mgr.deliver("Me", messages[i]); + } + + _mgr.processAsync(new OnCurrentThreadExecutor()); + assertEquals(0, s1.getMessages().size()); + s1.setSuspended(false); + + _mgr.processAsync(new OnCurrentThreadExecutor()); + assertEquals(messages.length - batch, s1.getMessages().size()); + + for (int i = batch; i < messages.length; i++) + { + assertTrue(messages[i] == s1.getMessages().get(i - batch)); + } + + } + + @Test(expected = NoConsumersException.class) + public void noConsumers() throws AMQException + { + AMQMessage msg = message(true); + _mgr.deliver("Me", msg); + msg.checkDeliveredToConsumer(); + } + + @Test(expected = NoConsumersException.class) + public void noActiveConsumers() throws AMQException + { + TestSubscription s = new TestSubscription("A"); + _subscriptions.addSubscriber(s); + s.setSuspended(true); + AMQMessage msg = message(true); + _mgr.deliver("Me", msg); + msg.checkDeliveredToConsumer(); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(DeliveryManagerTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java new file mode 100644 index 0000000000..be40b3c681 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.AMQException; + +class MessageTestHelper +{ + private final MessageStore _messageStore = new SkeletonMessageStore(); + + MessageTestHelper() throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + } + + AMQMessage message() throws AMQException + { + return message(false); + } + + AMQMessage message(boolean immediate) throws AMQException + { + BasicPublishBody publish = new BasicPublishBody(); + publish.immediate = immediate; + return new AMQMessage(_messageStore, publish, new ContentHeaderBody(), null); + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java new file mode 100644 index 0000000000..bb8fd5bc19 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.store.MessageStore; + +import javax.security.sasl.SaslServer; +import java.util.HashMap; +import java.util.Map; + +/** + * A protocol session that can be used for testing purposes. + */ +public class MockProtocolSession implements AMQProtocolSession +{ + private MessageStore _messageStore; + + private Map _channelMap = new HashMap(); + + public MockProtocolSession(MessageStore messageStore) + { + _messageStore = messageStore; + } + + public void dataBlockReceived(AMQDataBlock message) throws Exception + { + } + + public void writeFrame(AMQDataBlock frame) + { + } + + public String getContextKey() + { + return null; + } + + public void setContextKey(String contextKey) + { + } + + public AMQChannel getChannel(int channelId) + { + AMQChannel channel = _channelMap.get(channelId); + if (channel == null) + { + throw new IllegalArgumentException("Invalid channel id: " + channelId); + } + else + { + return channel; + } + } + + public void addChannel(AMQChannel channel) + { + if (channel == null) + { + throw new IllegalArgumentException("Channel must not be null"); + } + else + { + _channelMap.put(channel.getChannelId(), channel); + } + } + + public void closeChannel(int channelId) throws AMQException + { + } + + public void removeChannel(int channelId) + { + _channelMap.remove(channelId); + } + + public void initHeartbeats(int delay) + { + } + + public void closeSession() throws AMQException + { + } + + public Object getKey() + { + return null; + } + + public String getLocalFQDN() + { + return null; + } + + public SaslServer getSaslServer() + { + return null; + } + + public void setSaslServer(SaslServer saslServer) + { + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java new file mode 100644 index 0000000000..11c0026455 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.ConcurrentTest; + +public class QueueConcurrentPerfTest extends QueuePerfTest +{ + QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) + { + super(factory, queueCount, messages); + } + + public static void main(String[] argv) throws Exception + { + Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; + int iterations = 5; + String label = argv.length > 0 ? argv[0]: null; + System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); + //vary number of queues: + for(Factory f : factories) + { + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); + run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java new file mode 100644 index 0000000000..5b3857396d --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/QueuePerfTest.java @@ -0,0 +1,258 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.TimedRun; +import org.apache.qpid.server.util.RunStats; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class QueuePerfTest extends TimedRun +{ + private final Factory _factory; + private final int _queueCount; + private final int _messages; + private final String _msg = ""; + private List> _queues; + + QueuePerfTest(Factory factory, int queueCount, int messages) + { + super(factory + ", " + queueCount + ", " + messages); + _factory = factory; + _queueCount = queueCount; + _messages = messages; + } + + protected void setup() throws Exception + { + //init + int count = Integer.getInteger("prepopulate", 0); +// System.err.println("Prepopulating with " + count + " items"); + _queues = new ArrayList>(_queueCount); + for (int i = 0; i < _queueCount; i++) + { + Queue q = _factory.create(); + for(int j = 0; j < count; ++j) + { + q.add("Item"+ j); + } + _queues.add(q); + } + System.gc(); + } + + protected void teardown() throws Exception + { + System.gc(); + } + + protected void run() throws Exception + { + //dispatch + for (int i = 0; i < _messages; i++) + { + for (Queue q : _queues) + { + q.offer(_msg); + q.poll(); + } + } + } + + static interface Factory + { + Queue create(); + } + + static Factory CONCURRENT = new Factory() + { + public Queue create() + { + return new ConcurrentLinkedQueue(); + } + + public String toString() + { + return "ConcurrentLinkedQueue"; + } + + }; + + static Factory SYNCHRONIZED = new Factory() + { + public Queue create() + { + return new SynchronizedQueue(new LinkedList()); + } + + + public String toString() + { + return "Synchronized LinkedList"; + } + }; + + static Factory PLAIN = new Factory() + { + public Queue create() + { + return new LinkedList(); + } + + public String toString() + { + return "Plain LinkedList"; + } + }; + + static class SynchronizedQueue implements Queue + { + private final Queue queue; + + SynchronizedQueue(Queue queue) + { + this.queue = queue; + } + + public synchronized E element() + { + return queue.element(); + } + + public synchronized boolean offer(E o) + { + return queue.offer(o); + } + + public synchronized E peek() + { + return queue.peek(); + } + + public synchronized E poll() + { + return queue.poll(); + } + + public synchronized E remove() + { + return queue.remove(); + } + + public synchronized int size() + { + return queue.size(); + } + + public synchronized boolean isEmpty() + { + return queue.isEmpty(); + } + + public synchronized boolean contains(Object o) + { + return queue.contains(o); + } + + public synchronized Iterator iterator() + { + return queue.iterator(); + } + + public synchronized Object[] toArray() + { + return queue.toArray(); + } + + public synchronized T[] toArray(T[] a) + { + return queue.toArray(a); + } + + public synchronized boolean add(E o) + { + return queue.add(o); + } + + public synchronized boolean remove(Object o) + { + return queue.remove(o); + } + + public synchronized boolean containsAll(Collection c) + { + return queue.containsAll(c); + } + + public synchronized boolean addAll(Collection c) + { + return queue.addAll(c); + } + + public synchronized boolean removeAll(Collection c) + { + return queue.removeAll(c); + } + + public synchronized boolean retainAll(Collection c) + { + return queue.retainAll(c); + } + + public synchronized void clear() + { + queue.clear(); + } + } + + static void run(String label, AveragedRun test) throws Exception + { + RunStats stats = test.call(); + System.out.println((label == null ? "" : label + ", ") + test + + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); + } + + public static void main(String[] argv) throws Exception + { + Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; + int iterations = 5; + String label = argv.length > 0 ? argv[0]: null; + System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); + //vary number of queues: + + for(Factory f : factories) + { + run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); + run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); + } + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java new file mode 100644 index 0000000000..6490b9f270 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.handler.OnCurrentThreadExecutor; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.MockIoSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.util.AveragedRun; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.apache.qpid.server.util.TimedRun; + +import java.util.ArrayList; +import java.util.List; + +public class SendPerfTest extends TimedRun +{ + private int _messages = 1000; + private int _clients = 10; + private List _queues; + + public SendPerfTest(int clients, int messages) + { + super("SendPerfTest, msgs=" + messages + ", clients=" + clients); + _messages = messages; + _clients = clients; + } + + protected void setup() throws Exception + { + _queues = initQueues(_clients); + System.gc(); + } + + protected void teardown() throws Exception + { + System.gc(); + } + + protected void run() throws Exception + { + deliver(_messages, _queues); + } + + //have a dummy AMQProtocolSession that does nothing on the writeFrame() + //set up x number of queues + //create necessary bits and pieces to deliver a message + //deliver y messages to each queue + + public static void main(String[] argv) throws Exception + { + ApplicationRegistry.initialise(new TestApplicationRegistry()); + int clients = Integer.parseInt(argv[0]); + int messages = Integer.parseInt(argv[1]); + int iterations = Integer.parseInt(argv[2]); + AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); + test.run(); + } + + /** + * Delivers messages to a number of queues. + * @param count the number of messages to deliver + * @param queues the list of queues + * @throws NoConsumersException + */ + static void deliver(int count, List queues) throws AMQException + { + BasicPublishBody publish = new BasicPublishBody(); + publish.exchange = new NullExchange().getName(); + ContentHeaderBody header = new ContentHeaderBody(); + List body = new ArrayList(); + MessageStore messageStore = new SkeletonMessageStore(); + body.add(new ContentBody()); + for (int i = 0; i < count; i++) + { + for (AMQQueue q : queues) + { + q.deliver(new AMQMessage(messageStore, i, publish, header, body)); + } + } + } + + static List initQueues(int number) throws AMQException + { + Exchange exchange = new NullExchange(); + List queues = new ArrayList(number); + for (int i = 0; i < number; i++) + { + AMQQueue q = createQueue("Queue" + (i + 1)); + q.bind("routingKey", exchange); + try + { + q.registerProtocolSession(createSession(), 1, "1", false); + } + catch (Exception e) + { + throw new AMQException("Error creating protocol session: " + e, e); + } + queues.add(q); + } + return queues; + } + + static AMQQueue createQueue(String name) throws AMQException + { + return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), + new OnCurrentThreadExecutor()); + } + + static AMQProtocolSession createSession() throws Exception + { + IApplicationRegistry reg = ApplicationRegistry.getInstance(); + AMQCodecFactory codecFactory = new AMQCodecFactory(true); + AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); + result.addChannel(new AMQChannel(1, null, null)); + return result; + } + + static class NullExchange extends AbstractExchange + { + public String getName() + { + return "NullExchange"; + } + + protected ExchangeMBean createMBean() + { + return null; + } + + public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + } + + public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + { + } + + public void route(AMQMessage payload) throws AMQException + { + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java new file mode 100644 index 0000000000..fe0b686c1a --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionManagerTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import junit.framework.JUnit4TestAdapter; + +public class SubscriptionManagerTest +{ + private final SubscriptionSet mgr = new SubscriptionSet(); + + @Test + public void basicSubscriptionManagement() + { + assertTrue(mgr.isEmpty()); + assertFalse(mgr.hasActiveSubscribers()); + TestSubscription s1 = new TestSubscription("S1"); + mgr.addSubscriber(s1); + assertFalse(mgr.isEmpty()); + assertTrue(mgr.hasActiveSubscribers()); + + TestSubscription s2 = new TestSubscription("S2"); + mgr.addSubscriber(s2); + + s2.setSuspended(true); + assertFalse(mgr.isEmpty()); + assertTrue(mgr.hasActiveSubscribers()); + assertTrue(s2.isSuspended()); + assertFalse(s1.isSuspended()); + + s1.setSuspended(true); + assertFalse(mgr.hasActiveSubscribers()); + + mgr.removeSubscriber(new TestSubscription("S1")); + assertFalse(mgr.isEmpty()); + mgr.removeSubscriber(new TestSubscription("S2")); + assertTrue(mgr.isEmpty()); + } + + @Test + public void roundRobin() + { + TestSubscription a = new TestSubscription("A"); + TestSubscription b = new TestSubscription("B"); + TestSubscription c = new TestSubscription("C"); + TestSubscription d = new TestSubscription("D"); + mgr.addSubscriber(a); + mgr.addSubscriber(b); + mgr.addSubscriber(c); + mgr.addSubscriber(d); + + for (int i = 0; i < 3; i++) + { + assertEquals(a, mgr.nextSubscriber(null)); + assertEquals(b, mgr.nextSubscriber(null)); + assertEquals(c, mgr.nextSubscriber(null)); + assertEquals(d, mgr.nextSubscriber(null)); + } + + c.setSuspended(true); + + for (int i = 0; i < 3; i++) + { + assertEquals(a, mgr.nextSubscriber(null)); + assertEquals(b, mgr.nextSubscriber(null)); + assertEquals(d, mgr.nextSubscriber(null)); + } + + mgr.removeSubscriber(a); + d.setSuspended(true); + c.setSuspended(false); + Subscription e = new TestSubscription("D"); + mgr.addSubscriber(e); + + for (int i = 0; i < 3; i++) + { + assertEquals(b, mgr.nextSubscriber(null)); + assertEquals(c, mgr.nextSubscriber(null)); + assertEquals(e, mgr.nextSubscriber(null)); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(SubscriptionManagerTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java new file mode 100644 index 0000000000..e6b0f7e885 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionSetTest.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.JUnit4TestAdapter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import org.junit.Test; + +public class SubscriptionSetTest +{ + /** + * A SubscriptionSet that counts the number of items scanned. + */ + static class TestSubscriptionSet extends SubscriptionSet + { + private int scanned = 0; + + void resetScanned() + { + scanned = 0; + } + + protected void subscriberScanned() + { + ++scanned; + } + + int getScanned() + { + return scanned; + } + } + + final TestSubscription sub1 = new TestSubscription("1"); + final TestSubscription sub2 = new TestSubscription("2"); + final TestSubscription sub3 = new TestSubscription("3"); + + final TestSubscription suspendedSub1 = new TestSubscription("sus1", true); + final TestSubscription suspendedSub2 = new TestSubscription("sus2", true); + final TestSubscription suspendedSub3 = new TestSubscription("sus3", true); + + @Test + public void nextMessage() + { + SubscriptionSet ss = new SubscriptionSet(); + assertNull(ss.nextSubscriber(null)); + assertEquals(0, ss.getCurrentSubscriber()); + + ss.addSubscriber(sub1); + assertEquals(sub1, ss.nextSubscriber(null)); + assertEquals(1, ss.getCurrentSubscriber()); + assertEquals(sub1, ss.nextSubscriber(null)); + assertEquals(1, ss.getCurrentSubscriber()); + + ss.addSubscriber(sub2); + ss.addSubscriber(sub3); + + assertEquals(sub2, ss.nextSubscriber(null)); + assertEquals(2, ss.getCurrentSubscriber()); + + assertEquals(sub3, ss.nextSubscriber(null)); + assertEquals(3, ss.getCurrentSubscriber()); + } + + @Test + public void nextMessageWhenAllSuspended() + { + SubscriptionSet ss = createAllSuspendedSubscriptionSet(); + assertNull(ss.nextSubscriber(null)); + assertEquals(3, ss.getCurrentSubscriber()); + + assertNull(ss.nextSubscriber(null)); + assertEquals(3, ss.getCurrentSubscriber()); + } + + private TestSubscriptionSet createAllSuspendedSubscriptionSet() + { + TestSubscriptionSet ss = new TestSubscriptionSet(); + ss.addSubscriber(suspendedSub1); + ss.addSubscriber(suspendedSub2); + ss.addSubscriber(suspendedSub3); + return ss; + } + + @Test + public void nextMessageAfterRemove() + { + SubscriptionSet ss = new SubscriptionSet(); + ss.addSubscriber(suspendedSub1); + ss.addSubscriber(suspendedSub2); + ss.addSubscriber(sub3); + assertEquals(sub3, ss.nextSubscriber(null)); + assertEquals(3, ss.getCurrentSubscriber()); + + assertEquals(suspendedSub1, ss.removeSubscriber(suspendedSub1)); + + assertEquals(sub3, ss.nextSubscriber(null)); // Current implementation handles OutOfBoundsException here. + assertEquals(2, ss.getCurrentSubscriber()); + } + + @Test + public void nextMessageOverScanning() + { + TestSubscriptionSet ss = new TestSubscriptionSet(); + TestSubscription sub = new TestSubscription("test"); + ss.addSubscriber(suspendedSub1); + ss.addSubscriber(sub); + ss.addSubscriber(suspendedSub3); + assertEquals(sub, ss.nextSubscriber(null)); + assertEquals(2, ss.getCurrentSubscriber()); + assertEquals(2, ss.getScanned()); + + ss.resetScanned(); + sub.setSuspended(true); + assertNull(ss.nextSubscriber(null)); + assertEquals(3, ss.getCurrentSubscriber()); + // Current implementation overscans by one item here. + assertEquals(ss.size() + 1, ss.getScanned()); + } + + @Test + public void nextMessageOverscanWorstCase() { + TestSubscriptionSet ss = createAllSuspendedSubscriptionSet(); + ss.nextSubscriber(null); + // Scans the subscriptions twice. + assertEquals(ss.size() * 2, ss.getScanned()); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(SubscriptionSetTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java new file mode 100644 index 0000000000..faa6bdcbe7 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.queue.SynchronizedDeliveryManager; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.ConcurrentDeliveryManager; +import org.apache.qpid.server.queue.DeliveryManagerTest; +import org.apache.qpid.AMQException; +import junit.framework.JUnit4TestAdapter; + +public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest +{ + public SynchronizedDeliveryManagerTest() throws Exception + { + try + { + System.setProperty("concurrentdeliverymanager","false"); + _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false, + new DefaultQueueRegistry())); + } + catch (Throwable t) + { + t.printStackTrace(); + throw new AMQException("Could not initialise delivery manager", t); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(SynchronizedDeliveryManagerTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java new file mode 100644 index 0000000000..de6df4bc03 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.ArrayList; +import java.util.List; + +public class TestSubscription implements Subscription +{ + private final List messages; + private final Object key; + private boolean isSuspended; + + public TestSubscription(Object key) + { + this(key, new ArrayList()); + } + + public TestSubscription(final Object key, final boolean isSuspended) + { + this(key); + setSuspended(isSuspended); + } + + TestSubscription(Object key, List messages) + { + this.key = key; + this.messages = messages; + } + + List getMessages() + { + return messages; + } + + public void send(AMQMessage msg, AMQQueue queue) + { + messages.add(msg); + } + + public void setSuspended(boolean suspended) + { + isSuspended = suspended; + } + + public boolean isSuspended() + { + return isSuspended; + } + + public void queueDeleted(AMQQueue queue) + { + } + + public int hashCode() + { + return key.hashCode(); + } + + public boolean equals(Object o) + { + return o instanceof TestSubscription && ((TestSubscription) o).key.equals(key); + } + + public String toString() + { + return key.toString(); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/queue/UnitTests.java new file mode 100644 index 0000000000..fbdcaf7467 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/UnitTests.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + AckTest.class, + DeliveryManagerTest.class, + SubscriptionManagerTest.class, + SubscriptionSetTest.class, + ConcurrencyTest.class} +) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java new file mode 100644 index 0000000000..bc0a8a7d64 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.AMQException; +import org.apache.commons.configuration.Configuration; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A message store that does nothing. Designed to be used in tests that do not want to use any message store + * functionality. + */ +public class SkeletonMessageStore implements MessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + + public void configure(String base, Configuration config) throws Exception + { + } + + public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception + { + } + + public void close() throws Exception + { + } + + public void put(AMQMessage msg) + { + } + + public void removeMessage(long messageId) + { + } + + public void createQueue(AMQQueue queue) throws AMQException + { + } + + public void removeQueue(String name) throws AMQException + { + } + + public void enqueueMessage(String name, long messageId) throws AMQException + { + } + + public void dequeueMessage(String name, long messageId) throws AMQException + { + } + + public void beginTran() throws AMQException + { + } + + public boolean inTran() + { + return false; + } + + public void commitTran() throws AMQException + { + } + + public void abortTran() throws AMQException + { + } + + public List createQueues() throws AMQException + { + return null; + } + + public long getNewMessageId() + { + return _messageId.getAndIncrement(); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java new file mode 100644 index 0000000000..7431e6e4cc --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import junit.framework.JUnit4TestAdapter; +import org.junit.Test; +import org.junit.Assert; +import org.junit.Before; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.AMQException; + +/** + * Tests that reference counting works correctly with AMQMessage and the message store + */ +public class TestReferenceCounting +{ + private TestableMemoryMessageStore _store; + + @Before + public void createCommonObjects() + { + _store = new TestableMemoryMessageStore(); + } + + /** + * Check that when the reference count is decremented the message removes itself from the store + */ + @Test + public void testMessageGetsRemoved() throws AMQException + { + AMQMessage message = new AMQMessage(_store, null); + _store.put(message); + Assert.assertTrue(_store.getMessageMap().size() == 1); + message.decrementReference(); + Assert.assertTrue(_store.getMessageMap().size() == 0); + } + + @Test + public void testMessageRemains() throws AMQException + { + AMQMessage message = new AMQMessage(_store, null); + _store.put(message); + Assert.assertTrue(_store.getMessageMap().size() == 1); + message.incrementReference(); + message.decrementReference(); + Assert.assertTrue(_store.getMessageMap().size() == 1); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(TestReferenceCounting.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java new file mode 100644 index 0000000000..be7687a22c --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.AMQMessage; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Adds some extra methods to the memory message store for testing purposes. + */ +public class TestableMemoryMessageStore extends MemoryMessageStore +{ + public TestableMemoryMessageStore() + { + _messageMap = new ConcurrentHashMap(); + } + + public ConcurrentMap getMessageMap() + { + return _messageMap; + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/store/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/store/UnitTests.java new file mode 100644 index 0000000000..8e11f823a9 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/store/UnitTests.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TestReferenceCounting.class +}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java new file mode 100644 index 0000000000..f1b14b69b8 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import junit.framework.JUnit4TestAdapter; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; + +import java.util.LinkedList; + +public class TxnBufferTest +{ + private final LinkedList ops = new LinkedList(); + + @Before + public void setup() throws Exception + { + } + + @Test + public void commit() throws AMQException + { + MockStore store = new MockStore(); + + TxnBuffer buffer = new TxnBuffer(store); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + //check relative ordering + MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); + buffer.enlist(op); + buffer.enlist(op); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + + buffer.commit(); + + validateOps(); + store.validate(); + } + + @Test + public void rollback() throws AMQException + { + MockStore store = new MockStore(); + + TxnBuffer buffer = new TxnBuffer(store); + buffer.enlist(new MockOp().expectRollback()); + buffer.enlist(new MockOp().expectRollback()); + buffer.enlist(new MockOp().expectRollback()); + + buffer.rollback(); + + validateOps(); + store.validate(); + } + + @Test + public void commitWithFailureDuringPrepare() throws AMQException + { + MockStore store = new MockStore(); + store.expectBegin().expectAbort(); + + TxnBuffer buffer = new TxnBuffer(store); + buffer.containsPersistentChanges(); + buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); + buffer.enlist(new TxnTester(store)); + buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); + buffer.enlist(new FailedPrepare()); + buffer.enlist(new MockOp()); + + buffer.commit(); + validateOps(); + store.validate(); + } + + @Test + public void commitWithPersistance() throws AMQException + { + MockStore store = new MockStore(); + store.expectBegin().expectCommit(); + + TxnBuffer buffer = new TxnBuffer(store); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new MockOp().expectPrepare().expectCommit()); + buffer.enlist(new TxnTester(store)); + buffer.containsPersistentChanges(); + + buffer.commit(); + validateOps(); + store.validate(); + } + + private void validateOps() + { + for(MockOp op : ops) + { + op.validate(); + } + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(TxnBufferTest.class); + } + + class MockOp implements TxnOp + { + final Object PREPARE = "PREPARE"; + final Object COMMIT = "COMMIT"; + final Object UNDO_PREPARE = "UNDO_PREPARE"; + final Object ROLLBACK = "ROLLBACK"; + + private final LinkedList expected = new LinkedList(); + + MockOp() + { + ops.add(this); + } + + public void prepare() + { + assertEquals(expected.removeLast(), PREPARE); + } + + public void commit() + { + assertEquals(expected.removeLast(), COMMIT); + } + + public void undoPrepare() + { + assertEquals(expected.removeLast(), UNDO_PREPARE); + } + + public void rollback() + { + assertEquals(expected.removeLast(), ROLLBACK); + } + + private MockOp expect(Object optype) + { + expected.addFirst(optype); + return this; + } + + MockOp expectPrepare() + { + return expect(PREPARE); + } + + MockOp expectCommit() + { + return expect(COMMIT); + } + + MockOp expectUndoPrepare() + { + return expect(UNDO_PREPARE); + } + + MockOp expectRollback() + { + return expect(ROLLBACK); + } + + void validate() + { + assertEquals("Expected ops were not all invoked", new LinkedList(), expected); + } + + void clear() + { + expected.clear(); + } + } + + class MockStore extends TestableMemoryMessageStore + { + final Object BEGIN = "BEGIN"; + final Object ABORT = "ABORT"; + final Object COMMIT = "COMMIT"; + + private final LinkedList expected = new LinkedList(); + private boolean inTran; + + public void beginTran() throws AMQException + { + assertEquals(expected.removeLast(), BEGIN); + inTran = true; + } + + public void commitTran() throws AMQException + { + assertEquals(expected.removeLast(), COMMIT); + inTran = false; + } + + public void abortTran() throws AMQException + { + assertEquals(expected.removeLast(), ABORT); + inTran = false; + } + + public boolean inTran() + { + return inTran; + } + + private MockStore expect(Object optype) + { + expected.addFirst(optype); + return this; + } + + MockStore expectBegin() + { + return expect(BEGIN); + } + + MockStore expectCommit() + { + return expect(COMMIT); + } + + MockStore expectAbort() + { + return expect(ABORT); + } + + void clear() + { + expected.clear(); + } + + void validate() + { + assertEquals("Expected ops were not all invoked", new LinkedList(), expected); + } + } + + class NullOp implements TxnOp + { + public void prepare() throws AMQException + { + } + public void commit() + { + } + public void undoPrepare() + { + } + public void rollback() + { + } + } + + class FailedPrepare extends NullOp + { + public void prepare() throws AMQException + { + throw new AMQException("Fail!"); + } + } + + class TxnTester extends NullOp + { + private final MessageStore store; + + TxnTester(MessageStore store) + { + this.store = store; + } + + public void prepare() throws AMQException + { + assertTrue("Expected prepare to be performed under txn", store.inTran()); + } + + public void commit() + { + assertTrue("Expected commit not to be performed under txn", !store.inTran()); + } + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/txn/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/server/txn/UnitTests.java new file mode 100644 index 0000000000..975e8eea72 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/txn/UnitTests.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.txn; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TxnBufferTest.class +}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java new file mode 100644 index 0000000000..1d17985ab5 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/util/AveragedRun.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.qpid.server.util.TimedRun; + +import java.util.concurrent.Callable; +import java.util.Collection; + +public class AveragedRun implements Callable +{ + private final RunStats stats = new RunStats(); + private final TimedRun test; + private final int iterations; + + public AveragedRun(TimedRun test, int iterations) + { + this.test = test; + this.iterations = iterations; + } + + public RunStats call() throws Exception + { + for (int i = 0; i < iterations; i++) + { + stats.record(test.call()); + } + return stats; + } + + public void run() throws Exception + { + System.out.println(test + ": " + call()); + } + + public String toString() + { + return test.toString(); + } + + static void run(Collection tests) throws Exception + { + for(AveragedRun test : tests) + { + test.run(); + } + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java new file mode 100644 index 0000000000..1ae8d3205d --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/util/ConcurrentTest.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public class ConcurrentTest extends TimedRun +{ + private final TimedRun _test; + private final Thread[] _threads; + + public ConcurrentTest(TimedRun test, int threads) + { + super(test.toString()); + _test = test; + _threads = new Thread[threads]; + } + + protected void setup() throws Exception + { + _test.setup(); + for(int i = 0; i < _threads.length; i++) + { + _threads[i] = new Thread(new Runner()); + } + } + + protected void teardown() throws Exception + { + _test.teardown(); + } + + protected void run() throws Exception + { + for(Thread t : _threads) + { + t.start(); + } + for(Thread t : _threads) + { + t.join(); + } + } + + private class Runner implements Runnable + { + private Exception error; + + public void run() + { + try + { + _test.run(); + } + catch(Exception e) + { + error = e; + e.printStackTrace(); + } + } + } + +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java new file mode 100644 index 0000000000..ec67fc68b3 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/util/RunStats.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +public class RunStats +{ + private long min = Long.MAX_VALUE; + private long max; + private long total; + private int count; + + public void record(long time) + { + max = Math.max(time, max); + min = Math.min(time, min); + total += time; + count++; + } + + public long getMin() + { + return min; + } + + public long getMax() + { + return max; + } + + public long getAverage() + { + return total / count; + } + + public String toString() + { + return "avg=" + getAverage() + ", min=" + min + ", max=" + max; + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java new file mode 100644 index 0000000000..f801daf27c --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.auth.AuthenticationManager; +import org.apache.qpid.server.security.auth.NullAuthenticationManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; + +import java.util.HashMap; + +public class TestApplicationRegistry extends ApplicationRegistry +{ + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private ManagedObjectRegistry _managedObjectRegistry; + + private AuthenticationManager _authenticationManager; + + private MessageStore _messageStore; + + public TestApplicationRegistry() + { + super(new MapConfiguration(new HashMap())); + } + + public void initialise() throws Exception + { + _managedObjectRegistry = new NoopManagedObjectRegistry(); + _queueRegistry = new DefaultQueueRegistry(); + _exchangeFactory = new DefaultExchangeFactory(); + _exchangeRegistry = new DefaultExchangeRegistry(_exchangeFactory); + _authenticationManager = new NullAuthenticationManager(); + _messageStore = new TestableMemoryMessageStore(); + + _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } + + public Configuration getConfiguration() + { + return _configuration; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ManagedObjectRegistry getManagedObjectRegistry() + { + return _managedObjectRegistry; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } +} + diff --git a/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java new file mode 100644 index 0000000000..1291380311 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/server/util/TimedRun.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.concurrent.Callable; + +public abstract class TimedRun implements Callable +{ + private final String description; + + public TimedRun(String description) + { + this.description = description; + } + + public Long call() throws Exception + { + setup(); + long start = System.currentTimeMillis(); + run(); + long stop = System.currentTimeMillis(); + teardown(); + return stop - start; + } + + public String toString() + { + return description; + } + + protected void setup() throws Exception{} + protected void teardown() throws Exception{} + protected abstract void run() throws Exception; +} diff --git a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java new file mode 100644 index 0000000000..53d83fb6a3 --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java @@ -0,0 +1,237 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import junit.framework.JUnit4TestAdapter; +import org.apache.log4j.Logger; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.util.TestApplicationRegistry; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.*; + +public class DisconnectAndRedeliverTest +{ + private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); + + + static + { + String workdir = System.getProperty("QPID_WORK"); + if (workdir == null || workdir.equals("")) + { + String tempdir = System.getProperty("java.io.tmpdir"); + System.out.println("QPID_WORK not set using tmp directory: " + tempdir); + System.setProperty("QPID_WORK", tempdir); + } + //DOMConfigurator.configure("../etc/log4j.xml"); + DOMConfigurator.configure("broker/etc/log4j.xml"); + } + + @Before + public void resetAppliactionRegistry() throws Exception + { + createVMBroker(); + ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); + } + + + public void createVMBroker() + { + try + { + TransportConnection.createVMBroker(1); + } + catch (AMQVMBrokerCreationException e) + { + Assert.fail("Unable to create broker: " + e); + } + } + + @After + public void stopVmBroker() + { + TransportConnection.killVMBroker(1); + } + + /** + * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when + * the channel is closed. + * + * @throws Exception + */ + @Test + public void disconnectRedeliversMessages() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + + Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + + + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received and acknowledged first message"); + consumer.receive(); + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. About to disconnect and reconnect"); + + con.close(); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + + _logger.info("Starting second consumer connection"); + con.start(); + + tm = (TextMessage) consumer.receive(3000); + Assert.assertEquals("msg2", tm.getText()); + + + tm = (TextMessage) consumer.receive(3000); + Assert.assertEquals("msg3", tm.getText()); + + + tm = (TextMessage) consumer.receive(3000); + Assert.assertEquals("msg4", tm.getText()); + + _logger.info("Received redelivery of three messages. Acknowledging last message"); + tm.acknowledge(); + + con.close(); + + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + _logger.info("Starting third consumer connection"); + con.start(); + tm = (TextMessage) consumer.receiveNoWait(); + Assert.assertNull(tm); + _logger.info("No messages redelivered as is expected"); + con.close(); + + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + _logger.info("Starting fourth consumer connection"); + con.start(); + tm = (TextMessage) consumer.receive(3000); + Assert.assertNull(tm); + _logger.info("No messages redelivered as is expected"); + con.close(); + + _logger.info("Actually:" + store.getMessageMap().size()); + // Assert.assertTrue(store.getMessageMap().size() == 0); + } + + /** + * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be + * requeued (due perhaps to the queue being deleted). + * + * @throws Exception + */ + @Test + public void disconnectWithTransientQueueThrowsAwayMessages() throws Exception + { + + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); + Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = new AMQQueue("someQ", "someQ", false, true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + //force synch to ensure the consumer has resulted in a bound queue + ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); + + Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); + + con2.close(); + + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + tm.acknowledge(); + _logger.info("Received and acknowledged first message"); + consumer.receive(); + consumer.receive(); + consumer.receive(); + _logger.info("Received all four messages. About to disconnect and reconnect"); + + con.close(); + con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(queue); + + _logger.info("Starting second consumer connection"); + con.start(); + + tm = (TextMessage) consumer.receiveNoWait(); + Assert.assertNull(tm); + _logger.info("No messages redelivered as is expected"); + + _logger.info("Actually:" + store.getMessageMap().size()); + Assert.assertTrue(store.getMessageMap().size() == 0); + con.close(); + } + + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(DisconnectAndRedeliverTest.class); + } +} diff --git a/java/systests/src/test/java/org/apache/qpid/test/unit/ack/UnitTests.java b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/UnitTests.java new file mode 100644 index 0000000000..498667e2bc --- /dev/null +++ b/java/systests/src/test/java/org/apache/qpid/test/unit/ack/UnitTests.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.ack; + +import junit.framework.JUnit4TestAdapter; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({DisconnectAndRedeliverTest.class}) +public class UnitTests +{ + public static junit.framework.Test suite() + { + return new JUnit4TestAdapter(UnitTests.class); + } +} -- cgit v1.2.1