diff options
| author | Aidan Skinner <aidan@apache.org> | 2009-01-19 15:53:43 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2009-01-19 15:53:43 +0000 |
| commit | 0269fb662e844aa90ec659288fde3cd86643e6e4 (patch) | |
| tree | c5ad6c34d6a46507d46ddcfc4c3c6e9facf91613 /qpid/java/systests | |
| parent | b1f26965fd674c21cbbe5d7fa121d95d43c2aa39 (diff) | |
| download | qpid-python-0269fb662e844aa90ec659288fde3cd86643e6e4.tar.gz | |
QPID-1573: Move unit tests that were living in systests into appropriate module. Fix up a few bugs in other tests that this exposed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@735735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
14 files changed, 0 insertions, 2930 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java deleted file mode 100644 index b9b3168fcc..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server; - -import junit.framework.TestCase; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.ManagedBroker; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.client.transport.TransportConnection; - -public class AMQBrokerManagerMBeanTest extends TestCase -{ - private QueueRegistry _queueRegistry; - private ExchangeRegistry _exchangeRegistry; - - public void testExchangeOperations() throws Exception - { - String exchange1 = "testExchange1_" + System.currentTimeMillis(); - String exchange2 = "testExchange2_" + System.currentTimeMillis(); - String exchange3 = "testExchange3_" + System.currentTimeMillis(); - - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); - - VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - - ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject()); - mbean.createNewExchange(exchange1, "direct", false); - mbean.createNewExchange(exchange2, "topic", false); - mbean.createNewExchange(exchange3, "headers", false); - - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) != null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) != null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) != null); - - mbean.unregisterExchange(exchange1); - mbean.unregisterExchange(exchange2); - mbean.unregisterExchange(exchange3); - - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange1)) == null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange2)) == null); - assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null); - } - - public void testQueueOperations() throws Exception - { - String queueName = "testQueue_" + System.currentTimeMillis(); - VirtualHost vHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); - - ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) vHost.getManagedObject()); - - assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); - - mbean.createNewQueue(queueName, "test", false); - assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) != null); - - mbean.deleteQueue(queueName); - assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null); - } - - @Override - protected void setUp() throws Exception - { - super.setUp(); - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _queueRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry(); - _exchangeRegistry = appRegistry.getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java deleted file mode 100644 index aafddb810a..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.ack; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.queue.AMQMessageHandle; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; - -import java.util.*; - -public class TxAckTest extends TestCase -{ - private Scenario individual; - private Scenario multiple; - private Scenario combined; - - protected void setUp() throws Exception - { - super.setUp(); - - //ack only 5th msg - individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l)); - individual.update(5, false); - - //ack all up to and including 5th msg - multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l)); - multiple.update(5, true); - - //leave only 8th and 9th unacked - combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l)); - combined.update(3, false); - combined.update(5, true); - combined.update(7, true); - combined.update(2, true);//should be ignored - combined.update(1, false);//should be ignored - combined.update(10, false); - } - - public void testPrepare() throws AMQException - { - individual.prepare(); - multiple.prepare(); - combined.prepare(); - } - - public void testUndoPrepare() throws AMQException - { - individual.undoPrepare(); - multiple.undoPrepare(); - combined.undoPrepare(); - } - - public void testCommit() throws AMQException - { - individual.commit(); - multiple.commit(); - combined.commit(); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TxAckTest.class); - } - - private class Scenario - { - private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000); - private final TxAck _op = new TxAck(_map); - private final List<Long> _acked; - private final List<Long> _unacked; - private StoreContext _storeContext = new StoreContext(); - - Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception - { - TransactionalContext txnContext = new NonTransactionalContext(new TestMemoryMessageStore(), - _storeContext, null, - new LinkedList<RequiredDeliveryException>() - ); - AMQQueue queue = - AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("test", new MemoryMessageStore()), - null); - - for (int i = 0; i < messageCount; i++) - { - long deliveryTag = i + 1; - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, queue.enqueue(new StoreContext(), message)); - } - _acked = acked; - _unacked = unacked; - } - - void update(long deliverytag, boolean multiple) - { - _op.update(deliverytag, multiple); - } - - private void assertCount(List<Long> tags, int expected) - { - for (long tag : tags) - { - QueueEntry u = _map.get(tag); - assertTrue("Message not found for tag " + tag, u != null); - ((TestMessage) u.getMessage()).assertCountEquals(expected); - } - } - - void prepare() throws AMQException - { - _op.consolidate(); - _op.prepare(_storeContext); - - assertCount(_acked, -1); - assertCount(_unacked, 0); - - } - - void undoPrepare() - { - _op.consolidate(); - _op.undoPrepare(); - - assertCount(_acked, 1); - assertCount(_unacked, 0); - } - - void commit() - { - _op.consolidate(); - _op.commit(_storeContext); - - //check acked messages are removed from map - Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags()); - keys.retainAll(_acked); - assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty()); - //check unacked messages are still in map - keys = new HashSet<Long>(_unacked); - keys.removeAll(_map.getDeliveryTags()); - assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty()); - } - } - - private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) - { - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - null, - false); - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), - publishBody, - new ContentHeaderBody() - { - public int getSize() - { - return 1; - } - }); - } - catch (AMQException e) - { - // won't happen - } - - - return amqMessageHandle; - } - - - private class TestMessage extends AMQMessage - { - private final long _tag; - private int _count; - - TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) - throws AMQException - { - super(createMessageHandle(messageId, publishBody), storeContext, publishBody); - _tag = tag; - } - - - public boolean incrementReference() - { - _count++; - return true; - } - - public void decrementReference(StoreContext context) - { - _count--; - } - - void assertCountEquals(int expected) - { - assertEquals("Wrong count for message with tag " + _tag, expected, _count); - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java deleted file mode 100644 index 6dcb187a37..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ /dev/null @@ -1,562 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.queue.*; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.store.MemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.log4j.Logger; - -import java.util.*; - -public class AbstractHeadersExchangeTestBase extends TestCase -{ - private static final Logger _log = Logger.getLogger(AbstractHeadersExchangeTestBase.class); - - private final HeadersExchange exchange = new HeadersExchange(); - protected final Set<TestQueue> queues = new HashSet<TestQueue>(); - - /** - * Not used in this test, just there to stub out the routing calls - */ - private MessageStore _store = new MemoryMessageStore(); - - private StoreContext _storeContext = new StoreContext(); - - private MessageHandleFactory _handleFactory = new MessageHandleFactory(); - - private int count; - - public void testDoNothing() - { - // this is here only to make junit under Eclipse happy - } - - protected TestQueue bindDefault(String... bindings) throws AMQException - { - return bind("Queue" + (++count), bindings); - } - - protected TestQueue bind(String queueName, String... bindings) throws AMQException - { - return bind(queueName, getHeaders(bindings)); - } - - protected TestQueue bind(String queue, FieldTable bindings) throws AMQException - { - return bind(new TestQueue(new AMQShortString(queue)), bindings); - } - - protected TestQueue bind(TestQueue queue, String... bindings) throws AMQException - { - return bind(queue, getHeaders(bindings)); - } - - protected TestQueue bind(TestQueue queue, FieldTable bindings) throws AMQException - { - queues.add(queue); - exchange.registerQueue(null, queue, bindings); - return queue; - } - - - protected void route(Message m) throws AMQException - { - m.route(exchange); - m.getIncomingMessage().routingComplete(_store, _handleFactory); - if(m.getIncomingMessage().allContentReceived()) - { - m.getIncomingMessage().deliverToQueues(); - } - } - - protected void routeAndTest(Message m, TestQueue... expected) throws AMQException - { - routeAndTest(m, false, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException - { - routeAndTest(m, expectReturn, Arrays.asList(expected)); - } - - protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException - { - routeAndTest(m, false, expected); - } - - protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException - { - try - { - route(m); - assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn); - for (TestQueue q : queues) - { - if (expected.contains(q)) - { - assertTrue("Expected " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q; - } - else - { - assertFalse("Did not expect " + m + " to be delivered to " + q, q.isInQueue(m)); - //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q; - } - } - } - - catch (NoRouteException ex) - { - assertTrue("Expected "+m+" not to be returned",expectReturn); - } - - } - - static FieldTable getHeaders(String... entries) - { - FieldTable headers = FieldTableFactory.newFieldTable(); - for (String s : entries) - { - String[] parts = s.split("=", 2); - headers.setObject(parts[0], parts.length > 1 ? parts[1] : ""); - } - return headers; - } - - - static final class MessagePublishInfoImpl implements MessagePublishInfo - { - private AMQShortString _exchange; - private boolean _immediate; - private boolean _mandatory; - private AMQShortString _routingKey; - - public MessagePublishInfoImpl(AMQShortString routingKey) - { - _routingKey = routingKey; - } - - public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey) - { - _exchange = exchange; - _immediate = immediate; - _mandatory = mandatory; - _routingKey = routingKey; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public boolean isImmediate() - { - return _immediate; - - } - - public boolean isMandatory() - { - return _mandatory; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - - public void setExchange(AMQShortString exchange) - { - _exchange = exchange; - } - - public void setImmediate(boolean immediate) - { - _immediate = immediate; - } - - public void setMandatory(boolean mandatory) - { - _mandatory = mandatory; - } - - public void setRoutingKey(AMQShortString routingKey) - { - _routingKey = routingKey; - } - } - - static MessagePublishInfo getPublishRequest(final String id) - { - return new MessagePublishInfoImpl(null, false, false, new AMQShortString(id)); - } - - static ContentHeaderBody getContentHeader(FieldTable headers) - { - ContentHeaderBody header = new ContentHeaderBody(); - header.properties = getProperties(headers); - return header; - } - - static BasicContentHeaderProperties getProperties(FieldTable headers) - { - BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); - properties.setHeaders(headers); - return properties; - } - - static class TestQueue extends SimpleAMQQueue - { - final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); - - public TestQueue(AMQShortString name) throws AMQException - { - super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); - ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this); - } - - /** - * We override this method so that the default behaviour, which attempts to use a delivery manager, is - * not invoked. It is unnecessary since for this test we only care to know whether the message was - * sent to the queue; the queue processing logic is not being tested. - * @param msg - * @throws AMQException - */ - @Override - public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException - { - messages.add( new HeadersExchangeTest.Message(msg)); - return new QueueEntry() - { - - public AMQQueue getQueue() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public AMQMessage getMessage() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getSize() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean getDeliveredToConsumer() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean expired() throws AMQException - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isAcquired() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean acquire() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean acquire(Subscription sub) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean delete() - { - return false; - } - - public boolean isDeleted() - { - return false; - } - - public boolean acquiredBySubscription() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setDeliveredToSubscription() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void release() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public String debugIdentity() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean immediateAndNotDelivered() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setRedelivered(boolean b) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public Subscription getDeliveredSubscription() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void reject() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void reject(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isRejectedBy(Subscription subscription) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void requeue(StoreContext storeContext) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void dequeue(final StoreContext storeContext) throws FailedDequeueException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void dispose(final StoreContext storeContext) throws MessageCleanupException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void restoreCredit() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isQueueDeleted() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void addStateChangeListener(StateChangeListener listener) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean removeStateChangeListener(StateChangeListener listener) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public int compareTo(final QueueEntry o) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - }; - } - - boolean isInQueue(Message msg) - { - return messages.contains(msg); - } - - } - - /** - * Just add some extra utility methods to AMQMessage to aid testing. - */ - static class Message extends AMQMessage - { - private class TestIncomingMessage extends IncomingMessage - { - - public TestIncomingMessage(final long messageId, - final MessagePublishInfo info, - final TransactionalContext txnContext, - final AMQProtocolSession publisher) - { - super(messageId, info, txnContext, publisher); - } - - - public AMQMessage getUnderlyingMessage() - { - return Message.this; - } - - - public ContentHeaderBody getContentHeaderBody() - { - try - { - return Message.this.getContentHeaderBody(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - } - - private IncomingMessage _incoming; - - private static MessageStore _messageStore = new SkeletonMessageStore(); - - private static StoreContext _storeContext = new StoreContext(); - - - private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>() - ); - - Message(String id, String... headers) throws AMQException - { - this(id, getHeaders(headers)); - } - - Message(String id, FieldTable headers) throws AMQException - { - this(_messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null); - } - - public IncomingMessage getIncomingMessage() - { - return _incoming; - } - - private Message(long messageId, - MessagePublishInfo publish, - ContentHeaderBody header, - List<ContentBody> bodies) throws AMQException - { - super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish); - - - - _incoming = new TestIncomingMessage(getMessageId(),publish,_txnContext,new MockProtocolSession(_messageStore)); - _incoming.setContentHeaderBody(header); - - - } - - private static AMQMessageHandle createMessageHandle(final long messageId, - final MessagePublishInfo publish, - final ContentHeaderBody header) - { - - final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, - _messageStore, - true); - - try - { - amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header); - } - catch (AMQException e) - { - - } - return amqMessageHandle; - } - - private Message(AMQMessage msg) throws AMQException - { - super(msg); - } - - - - void route(Exchange exchange) throws AMQException - { - exchange.route(_incoming); - } - - - public int hashCode() - { - return getKey().hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof HeadersExchangeTest.Message && equals((HeadersExchangeTest.Message) o); - } - - private boolean equals(HeadersExchangeTest.Message m) - { - return getKey().equals(m.getKey()); - } - - public String toString() - { - return getKey().toString(); - } - - private Object getKey() - { - try - { - return getMessagePublishInfo().getRoutingKey(); - } - catch (AMQException e) - { - _log.error("Error getting routing key: " + e, e); - return null; - } - } - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java deleted file mode 100644 index fd11ddeae2..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.NullApplicationRegistry; -import org.apache.qpid.framing.BasicPublishBody; - -public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase -{ - protected void setUp() throws Exception - { - super.setUp(); - ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); - } - - protected void tearDown() - { - ApplicationRegistry.remove(1); - } - - public void testSimple() throws AMQException - { - TestQueue q1 = bindDefault("F0000"); - TestQueue q2 = bindDefault("F0000=Aardvark"); - TestQueue q3 = bindDefault("F0001"); - TestQueue q4 = bindDefault("F0001=Bear"); - TestQueue q5 = bindDefault("F0000", "F0001"); - TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); - TestQueue q7 = bindDefault("F0000", "F0001=Bear"); - TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - - routeAndTest(new Message("Message1", "F0000"), q1); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q5, q8); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q3, q4, q5, q7); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), - q1, q2, q3, q4, q5, q6, q7, q8); - routeAndTest(new Message("Message6", "F0002")); - - Message m7 = new Message("Message7", "XXXXX"); - - MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo()); - pb7.setMandatory(true); - routeAndTest(m7,true); - - Message m8 = new Message("Message8", "F0000"); - MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo()); - pb8.setMandatory(true); - routeAndTest(m8,false,q1); - - - } - - public void testAny() throws AMQException - { - TestQueue q1 = bindDefault("F0000", "F0001", "X-match=any"); - TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); - TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); - TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); - - routeAndTest(new Message("Message1", "F0000"), q1, q3); - routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2, q3, q4); - routeAndTest(new Message("Message3", "F0000=Aardvark", "F0001"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6); - routeAndTest(new Message("Message6", "F0002")); - } - - public void testMandatory() throws AMQException - { - bindDefault("F0000"); - Message m1 = new Message("Message1", "XXXXX"); - Message m2 = new Message("Message2", "F0000"); - MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo()); - pb1.setMandatory(true); - MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo()); - pb2.setMandatory(true); - routeAndTest(m1,true); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(HeadersExchangeTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/plugins/PluginTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/plugins/PluginTest.java deleted file mode 100644 index 0762a7a561..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/plugins/PluginTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.plugins; - -import java.util.Map; - -import org.apache.qpid.server.exchange.ExchangeType; - -import junit.framework.TestCase; - -public class PluginTest extends TestCase -{ - - private static final String TEST_EXCHANGE_CLASS = "org.apache.qpid.extras.exchanges.example.TestExchangeType"; - private static final String PLUGIN_DIRECTORY = System.getProperty("example.plugin.target"); - - public void testLoadExchanges() throws Exception - { - PluginManager manager = new PluginManager(PLUGIN_DIRECTORY); - Map<String, ExchangeType<?>> exchanges = manager.getExchanges(); - assertNotNull("No exchanges found in "+PLUGIN_DIRECTORY, exchanges); - assertEquals("Wrong number of exchanges found in "+PLUGIN_DIRECTORY, - 2, exchanges.size()); - assertNotNull("Wrong exchange found in "+PLUGIN_DIRECTORY, - exchanges.get(TEST_EXCHANGE_CLASS)); - } - - public void testNoExchanges() throws Exception - { - PluginManager manager = new PluginManager("/path/to/nowhere"); - Map<String, ExchangeType<?>> exchanges = manager.getExchanges(); - assertNull("Exchanges found", exchanges); - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java deleted file mode 100644 index 8e7038eec3..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import junit.framework.TestCase; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; - -import javax.management.JMException; - -/** - * Test class to test MBean operations for AMQMinaProtocolSession. - */ -public class AMQProtocolSessionMBeanTest extends TestCase -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class); - - private MessageStore _messageStore = new SkeletonMessageStore(); - private AMQMinaProtocolSession _protocolSession; - private AMQChannel _channel; - private AMQProtocolSessionMBean _mbean; - - public void testChannels() throws Exception - { - // check the channel count is correct - int channelCount = _mbean.channels().size(); - assertTrue(channelCount == 1); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue_" + System.currentTimeMillis()), - false, - new AMQShortString("test"), - true, - _protocolSession.getVirtualHost(), null); - AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); - channel.setDefaultQueue(queue); - _protocolSession.addChannel(channel); - channelCount = _mbean.channels().size(); - assertTrue(channelCount == 2); - - // general properties test - _mbean.setMaximumNumberOfChannels(1000L); - assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L); - - // check APIs - AMQChannel channel3 = new AMQChannel(_protocolSession, 3, _messageStore); - channel3.setLocalTransactional(); - _protocolSession.addChannel(channel3); - _mbean.rollbackTransactions(2); - _mbean.rollbackTransactions(3); - _mbean.commitTransactions(2); - _mbean.commitTransactions(3); - - // This should throw exception, because the channel does't exist - try - { - _mbean.commitTransactions(4); - fail(); - } - catch (JMException ex) - { - log.debug("expected exception is thrown :" + ex.getMessage()); - } - - // check if closing of session works - _protocolSession.addChannel(new AMQChannel(_protocolSession, 5, _messageStore)); - _mbean.closeConnection(); - try - { - channelCount = _mbean.channels().size(); - assertTrue(channelCount == 0); - // session is now closed so adding another channel should throw an exception - _protocolSession.addChannel(new AMQChannel(_protocolSession, 6, _messageStore)); - fail(); - } - catch (AMQException ex) - { - log.debug("expected exception is thrown :" + ex.getMessage()); - } - } - - @Override - protected void setUp() throws Exception - { - super.setUp(); - - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - _protocolSession = - new AMQMinaProtocolSession(new MockIoSession(), appRegistry.getVirtualHostRegistry(), new AMQCodecFactory(true), - null); - _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); - _channel = new AMQChannel(_protocolSession, 1, _messageStore); - _protocolSession.addChannel(_channel); - _mbean = (AMQProtocolSessionMBean) _protocolSession.getManagedObject(); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java deleted file mode 100644 index 62f5e0c6bf..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; - -/** Test class to test MBean operations for AMQMinaProtocolSession. */ -public class MaxChannelsTest extends TestCase -{ -// private MessageStore _messageStore = new SkeletonMessageStore(); - - public void testChannels() throws Exception - { - IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); - AMQMinaProtocolSession _protocolSession = new AMQMinaProtocolSession(new MockIoSession(), - appRegistry.getVirtualHostRegistry(), - new AMQCodecFactory(true), - null); - _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test")); - - // check the channel count is correct - int channelCount = _protocolSession.getChannels().size(); - assertEquals("Initial channel count wrong", 0, channelCount); - - long maxChannels = 10L; - _protocolSession.setMaximumNumberOfChannels(maxChannels); - assertEquals("Number of channels not correctly set.", new Long(maxChannels), _protocolSession.getMaximumNumberOfChannels()); - - - try - { - for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++) - { - _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null)); - } - } - catch (AMQException e) - { - assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED); - } - assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_protocolSession.getChannels().size())); - } - -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java deleted file mode 100644 index cf6366b513..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MockIoSession.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.mina.common.*; -import org.apache.mina.common.support.DefaultCloseFuture; -import org.apache.mina.common.support.DefaultWriteFuture; - -import java.net.SocketAddress; -import java.net.InetSocketAddress; -import java.util.Set; - -public class MockIoSession implements IoSession -{ - private AMQProtocolSession _protocolSession; - - /** - * Stores the last response written - */ - private Object _lastWrittenObject; - - private boolean _closing; - - public MockIoSession() - { - } - - public Object getLastWrittenObject() - { - return _lastWrittenObject; - } - - public IoService getService() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoServiceConfig getServiceConfig() - { - return null; - } - - public IoHandler getHandler() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoSessionConfig getConfig() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public IoFilterChain getFilterChain() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public WriteFuture write(Object message) - { - WriteFuture wf = new DefaultWriteFuture(null); - _lastWrittenObject = message; - return wf; - } - - public CloseFuture close() - { - _closing = true; - CloseFuture cf = new DefaultCloseFuture(null); - cf.setClosed(); - return cf; - } - - public Object getAttachment() - { - return _protocolSession; - } - - public Object setAttachment(Object attachment) - { - Object current = _protocolSession; - _protocolSession = (AMQProtocolSession) attachment; - return current; - } - - public Object getAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key, Object value) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object setAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public Object removeAttribute(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean containsAttribute(String key) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public Set getAttributeKeys() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public TransportType getTransportType() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isConnected() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isClosing() - { - return _closing; - } - - public CloseFuture getCloseFuture() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getRemoteAddress() - { - return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getLocalAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public SocketAddress getServiceAddress() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public int getWriteTimeout() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWriteTimeoutInMillis() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setWriteTimeout(int writeTimeout) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public TrafficMask getTrafficMask() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void suspendWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeRead() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resumeWrite() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getWrittenBytes() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getReadMessages() - { - return 0L; - } - - public long getWrittenMessages() - { - return 0L; - } - - public long getWrittenWriteRequests() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteRequests() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIoTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastReadTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastWriteTime() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isIdle(IdleStatus status) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public int getIdleCount(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java deleted file mode 100644 index 9c2932c5e2..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ /dev/null @@ -1,420 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import junit.framework.TestCase; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; -import org.apache.qpid.server.ack.UnacknowledgedMessageMap; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.util.NullApplicationRegistry; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Set; -import java.util.Collections; - -/** - * Tests that acknowledgements are handled correctly. - */ -public class AckTest extends TestCase -{ - private static final Logger _log = Logger.getLogger(AckTest.class); - - private Subscription _subscription; - - private MockProtocolSession _protocolSession; - - private TestMemoryMessageStore _messageStore; - - private StoreContext _storeContext = new StoreContext(); - - private AMQChannel _channel; - - private AMQQueue _queue; - - private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); - - protected void setUp() throws Exception - { - super.setUp(); - ApplicationRegistry.initialise(new NullApplicationRegistry(), 1); - - _messageStore = new TestMemoryMessageStore(); - _protocolSession = new MockProtocolSession(_messageStore); - _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/); - - _protocolSession.addChannel(_channel); - - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), - null); - } - - protected void tearDown() - { - ApplicationRegistry.remove(1); - } - - private void publishMessages(int count) throws AMQException - { - publishMessages(count, false); - } - - private void publishMessages(int count, boolean persistent) throws AMQException - { - TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null, - new LinkedList<RequiredDeliveryException>() - ); - _queue.registerSubscription(_subscription,false); - MessageHandleFactory factory = new MessageHandleFactory(); - for (int i = 1; i <= count; i++) - { - // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) - // TODO: Establish some way to determine the version for the test. - MessagePublishInfo publishBody = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return new AMQShortString("someExchange"); - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return new AMQShortString("rk"); - } - }; - IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, txnContext,_protocolSession); - //IncomingMessage msg2 = null; - if (persistent) - { - BasicContentHeaderProperties b = new BasicContentHeaderProperties(); - //This is DeliveryMode.PERSISTENT - b.setDeliveryMode((byte) 2); - ContentHeaderBody cb = new ContentHeaderBody(); - cb.properties = b; - msg.setContentHeaderBody(cb); - } - else - { - msg.setContentHeaderBody(new ContentHeaderBody()); - } - // we increment the reference here since we are not delivering the messaging to any queues, which is where - // the reference is normally incremented. The test is easier to construct if we have direct access to the - // subscription - ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); - qs.add(_queue); - msg.enqueue(qs); - msg.routingComplete(_messageStore, factory); - if(msg.allContentReceived()) - { - msg.deliverToQueues(); - } - // we manually send the message to the subscription - //_subscription.send(new QueueEntry(_queue,msg), _queue); - } - } - - /** - * Tests that the acknowledgements are correctly associated with a channel and - * order is preserved when acks are enabled - */ - public void testAckChannelAssociationTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount, true); - - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i); - i++; - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - } - - assertTrue(map.size() == msgCount); - assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount); - } - - /** - * Tests that in no-ack mode no messages are retained - */ - public void testNoAckMode() throws AMQException - { - // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); - - } - - /** - * Tests that in no-ack mode no messages are retained - */ - public void testPersistentNoAckMode() throws AMQException - { - // false arg means no acks expected - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount, true); - - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - assertTrue(_messageStore.getMessageMetaDataMap().size() == 0); - assertTrue(_messageStore.getContentBodyMap().size() == 0); - - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - public void testSingleAckReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(5, false); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == msgCount - 1); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - // 5 is the delivery tag of the message that *should* be removed - if (++i == 5) - { - ++i; - } - } - } - - /** - * Tests that a single acknowledgement is handled correctly (i.e multiple flag not - * set case) - */ - public void testMultiAckReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(5, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - ++i; - } - } - - /** - * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. - */ - public void testMultiAckAllReceivedTest() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - final int msgCount = 10; - publishMessages(msgCount); - - _channel.acknowledgeMessage(0, true); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 0); - - Set<Long> deliveryTagSet = map.getDeliveryTags(); - int i = 1; - for (long deliveryTag : deliveryTagSet) - { - assertTrue(deliveryTag == i + 5); - QueueEntry unackedMsg = map.get(deliveryTag); - assertTrue(unackedMsg.getQueue() == _queue); - ++i; - } - } - - /** - * A regression fixing QPID-1136 showed this up - * - * @throws Exception - */ - public void testMessageDequeueRestoresCreditTest() throws Exception - { - // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); - - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, - DEFAULT_CONSUMER_TAG, true, null, false, creditManager); - final int msgCount = 1; - publishMessages(msgCount); - - _queue.deliverAsync(_subscription); - - _channel.acknowledgeMessage(1, false); - - // Check credit available - assertTrue("No credit available", creditManager.hasCredit()); - - } - - -/* - public void testPrefetchHighLow() throws AMQException - { - int lowMark = 5; - int highMark = 10; - - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - _channel.setPrefetchLowMarkCount(lowMark); - _channel.setPrefetchHighMarkCount(highMark); - - assertTrue(_channel.getPrefetchLowMarkCount() == lowMark); - assertTrue(_channel.getPrefetchHighMarkCount() == highMark); - - publishMessages(highMark); - - // at this point we should have sent out only highMark messages - // which have not bee received so will be queued up in the channel - // which should be suspended - assertTrue(_subscription.isSuspended()); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == highMark); - - //acknowledge messages so we are just above lowMark - _channel.acknowledgeMessage(lowMark - 1, true); - - //we should still be suspended - assertTrue(_subscription.isSuspended()); - assertTrue(map.size() == lowMark + 1); - - //acknowledge one more message - _channel.acknowledgeMessage(lowMark, true); - - //and suspension should be lifted - assertTrue(!_subscription.isSuspended()); - - //pubilsh more msgs so we are just below the limit - publishMessages(lowMark - 1); - - //we should not be suspended - assertTrue(!_subscription.isSuspended()); - - //acknowledge all messages - _channel.acknowledgeMessage(0, true); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - //map will be empty - assertTrue(map.size() == 0); - } - -*/ -/* - public void testPrefetch() throws AMQException - { - _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager()); - _channel.setMessageCredit(5); - - assertTrue(_channel.getPrefetchCount() == 5); - - final int msgCount = 5; - publishMessages(msgCount); - - // at this point we should have sent out only 5 messages with a further 5 queued - // up in the channel which should now be suspended - assertTrue(_subscription.isSuspended()); - UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); - assertTrue(map.size() == 5); - _channel.acknowledgeMessage(5, true); - assertTrue(!_subscription.isSuspended()); - try - { - Thread.sleep(3000); - } - catch (InterruptedException e) - { - _log.error("Error: " + e, e); - } - assertTrue(map.size() == 0); - } - -*/ - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(AckTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java deleted file mode 100644 index 99c88fac3e..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.framing.*; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.transport.Sender; - -import javax.security.sasl.SaslServer; -import java.util.HashMap; -import java.util.Map; -import java.security.Principal; - -/** - * A protocol session that can be used for testing purposes. - */ -public class MockProtocolSession implements AMQProtocolSession -{ - private MessageStore _messageStore; - - private Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>(); - - public MockProtocolSession(MessageStore messageStore) - { - _messageStore = messageStore; - } - - public void dataBlockReceived(AMQDataBlock message) throws Exception - { - } - - public void writeFrame(AMQDataBlock frame) - { - } - - public AMQShortString getContextKey() - { - return null; - } - - public void setContextKey(AMQShortString contextKey) - { - } - - public AMQChannel getChannel(int channelId) - { - AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Invalid channel id: " + channelId); - } - else - { - return channel; - } - } - - public void addChannel(AMQChannel channel) - { - if (channel == null) - { - throw new IllegalArgumentException("Channel must not be null"); - } - else - { - _channelMap.put(channel.getChannelId(), channel); - } - } - - public void closeChannel(int channelId) throws AMQException - { - } - - public void closeChannelOk(int channelId) - { - - } - - public boolean channelAwaitingClosure(int channelId) - { - return false; - } - - public void removeChannel(int channelId) - { - _channelMap.remove(channelId); - } - - public void initHeartbeats(int delay) - { - } - - public void closeSession() throws AMQException - { - } - - public void closeConnection(int channelId, AMQConnectionException e, boolean closeIoSession) throws AMQException - { - } - - public Object getKey() - { - return null; - } - - public String getLocalFQDN() - { - return null; - } - - public SaslServer getSaslServer() - { - return null; - } - - public void setSaslServer(SaslServer saslServer) - { - } - - public FieldTable getClientProperties() - { - return null; - } - - public void setClientProperties(FieldTable clientProperties) - { - } - - public Object getClientIdentifier() - { - return null; - } - - public VirtualHost getVirtualHost() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setVirtualHost(VirtualHost virtualHost) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void addSessionCloseTask(Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void removeSessionCloseTask(Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public ProtocolOutputConverter getProtocolOutputConverter() - { - return ProtocolOutputConverterRegistry.getConverter(this); - } - - public void setAuthorizedID(Principal authorizedID) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public Principal getAuthorizedID() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public MethodRegistry getMethodRegistry() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void methodFrameReceived(int channelId, AMQMethodBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void contentHeaderReceived(int channelId, ContentHeaderBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void contentBodyReceived(int channelId, ContentBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void heartbeatBodyReceived(int channelId, HeartbeatBody body) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public MethodDispatcher getMethodDispatcher() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public ProtocolSessionIdentifier getSessionIdentifier() - { - return null; - } - - public byte getProtocolMajorVersion() - { - return getProtocolVersion().getMajorVersion(); - } - - public byte getProtocolMinorVersion() - { - return getProtocolVersion().getMinorVersion(); - } - - - public ProtocolVersion getProtocolVersion() - { - return ProtocolVersion.getLatestSupportedVersion(); //To change body of implemented methods use File | Settings | File Templates. - } - - - public VersionSpecificRegistry getRegistry() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setSender(Sender<java.nio.ByteBuffer> sender) - { - // FIXME AS TODO - - } - - public void init() - { - // TODO Auto-generated method stub - - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java deleted file mode 100644 index f08a15a8a7..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.commons.configuration.Configuration; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A message store that does nothing. Designed to be used in tests that do not want to use any message store - * functionality. - */ -public class SkeletonMessageStore implements MessageStore -{ - private final AtomicLong _messageId = new AtomicLong(1); - - public void configure(String base, Configuration config) throws Exception - { - } - - public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void close() throws Exception - { - } - - public void removeMessage(StoreContext s, Long messageId) - { - } - - public void createExchange(Exchange exchange) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void removeExchange(Exchange exchange) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void createQueue(AMQQueue queue) throws AMQException - { - } - - public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException - { - } - - public void beginTran(StoreContext s) throws AMQException - { - } - - public boolean inTran(StoreContext sc) - { - return false; - } - - public void commitTran(StoreContext storeContext) throws AMQException - { - } - - public void abortTran(StoreContext storeContext) throws AMQException - { - } - - public List<AMQQueue> createQueues() throws AMQException - { - return null; - } - - public Long getNewMessageId() - { - return _messageId.getAndIncrement(); - } - - public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException - { - - } - - public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException - { - - } - - public MessageMetaData getMessageMetaData(StoreContext s,Long messageId) throws AMQException - { - return null; - } - - public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException - { - return null; - } - - public boolean isPersistent() - { - return false; - } - - public void removeQueue(final AMQQueue queue) throws AMQException - { - - } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - - } - - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java deleted file mode 100644 index 4e48435962..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.abstraction.ContentChunk; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.List; - -/** - * Adds some extra methods to the memory message store for testing purposes. - */ -public class TestMemoryMessageStore extends MemoryMessageStore -{ - public TestMemoryMessageStore() - { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); - } - - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() - { - return _metaDataMap; - } - - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - return _contentBodyMap; - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java deleted file mode 100644 index 2346660d25..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.MessageHandleFactory; -import org.apache.qpid.server.queue.AMQMessageHandle; - -/** - * Tests that reference counting works correctly with AMQMessage and the message store - */ -public class TestReferenceCounting extends TestCase -{ - private TestMemoryMessageStore _store; - - private StoreContext _storeContext = new StoreContext(); - - - protected void setUp() throws Exception - { - super.setUp(); - _store = new TestMemoryMessageStore(); - } - - /** - * Check that when the reference count is decremented the message removes itself from the store - */ - public void testMessageGetsRemoved() throws AMQException - { - ContentHeaderBody chb = createPersistentContentHeader(); - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - - final long messageId = _store.getNewMessageId(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info, chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext,info); - - message = message.takeReference(); - - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); - assertEquals(1, _store.getMessageMetaDataMap().size()); - } - - private ContentHeaderBody createPersistentContentHeader() - { - ContentHeaderBody chb = new ContentHeaderBody(); - BasicContentHeaderProperties bchp = new BasicContentHeaderProperties(); - bchp.setDeliveryMode((byte)2); - chb.properties = bchp; - return chb; - } - - public void testMessageRemains() throws AMQException - { - - MessagePublishInfo info = new MessagePublishInfo() - { - - public AMQShortString getExchange() - { - return null; - } - - public void setExchange(AMQShortString exchange) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isImmediate() - { - return false; - } - - public boolean isMandatory() - { - return false; - } - - public AMQShortString getRoutingKey() - { - return null; - } - }; - - final Long messageId = _store.getNewMessageId(); - final ContentHeaderBody chb = createPersistentContentHeader(); - AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true); - messageHandle.setPublishAndContentHeaderBody(_storeContext,info,chb); - AMQMessage message = new AMQMessage(messageHandle, - _storeContext, - info); - - - message = message.takeReference(); - // we call routing complete to set up the handle - // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); - - - - assertEquals(1, _store.getMessageMetaDataMap().size()); - message = message.takeReference(); - message.decrementReference(_storeContext); - assertEquals(1, _store.getMessageMetaDataMap().size()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TestReferenceCounting.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java deleted file mode 100644 index 84d3d313d1..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.txn; - -import junit.framework.TestCase; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.TestMemoryMessageStore; -import org.apache.qpid.server.store.StoreContext; - -import java.util.LinkedList; -import java.util.NoSuchElementException; - -public class TxnBufferTest extends TestCase -{ - private final LinkedList<MockOp> ops = new LinkedList<MockOp>(); - - public void testCommit() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - //check relative ordering - MockOp op = new MockOp().expectPrepare().expectPrepare().expectCommit().expectCommit(); - buffer.enlist(op); - buffer.enlist(op); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - - buffer.commit(null); - - validateOps(); - store.validate(); - } - - public void testRollback() throws AMQException - { - MockStore store = new MockStore(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - buffer.enlist(new MockOp().expectRollback()); - - buffer.rollback(null); - - validateOps(); - store.validate(); - } - - public void testCommitWithFailureDuringPrepare() throws AMQException - { - MockStore store = new MockStore(); - store.beginTran(null); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new StoreMessageOperation(store)); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new TxnTester(store)); - buffer.enlist(new MockOp().expectPrepare().expectUndoPrepare()); - buffer.enlist(new FailedPrepare()); - buffer.enlist(new MockOp()); - - try - { - buffer.commit(null); - } - catch (NoSuchElementException e) - { - - } - - validateOps(); - store.validate(); - } - - public void testCommitWithPersistance() throws AMQException - { - MockStore store = new MockStore(); - store.beginTran(null); - store.expectCommit(); - - TxnBuffer buffer = new TxnBuffer(); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new MockOp().expectPrepare().expectCommit()); - buffer.enlist(new StoreMessageOperation(store)); - buffer.enlist(new TxnTester(store)); - - buffer.commit(null); - validateOps(); - store.validate(); - } - - private void validateOps() - { - for (MockOp op : ops) - { - op.validate(); - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TxnBufferTest.class); - } - - class MockOp implements TxnOp - { - final Object PREPARE = "PREPARE"; - final Object COMMIT = "COMMIT"; - final Object UNDO_PREPARE = "UNDO_PREPARE"; - final Object ROLLBACK = "ROLLBACK"; - - private final LinkedList expected = new LinkedList(); - - MockOp() - { - ops.add(this); - } - - public void prepare(StoreContext context) - { - assertEquals(expected.removeLast(), PREPARE); - } - - public void commit(StoreContext context) - { - assertEquals(expected.removeLast(), COMMIT); - } - - public void undoPrepare() - { - assertEquals(expected.removeLast(), UNDO_PREPARE); - } - - public void rollback(StoreContext context) - { - assertEquals(expected.removeLast(), ROLLBACK); - } - - private MockOp expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockOp expectPrepare() - { - return expect(PREPARE); - } - - MockOp expectCommit() - { - return expect(COMMIT); - } - - MockOp expectUndoPrepare() - { - return expect(UNDO_PREPARE); - } - - MockOp expectRollback() - { - return expect(ROLLBACK); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - - void clear() - { - expected.clear(); - } - } - - class MockStore extends TestMemoryMessageStore - { - final Object BEGIN = "BEGIN"; - final Object ABORT = "ABORT"; - final Object COMMIT = "COMMIT"; - - private final LinkedList expected = new LinkedList(); - private boolean inTran; - - public void beginTran(StoreContext context) throws AMQException - { - inTran = true; - } - - public void commitTran(StoreContext context) throws AMQException - { - assertEquals(expected.removeLast(), COMMIT); - inTran = false; - } - - public void abortTran(StoreContext context) throws AMQException - { - assertEquals(expected.removeLast(), ABORT); - inTran = false; - } - - public boolean inTran(StoreContext context) - { - return inTran; - } - - private MockStore expect(Object optype) - { - expected.addFirst(optype); - return this; - } - - MockStore expectBegin() - { - return expect(BEGIN); - } - - MockStore expectCommit() - { - return expect(COMMIT); - } - - MockStore expectAbort() - { - return expect(ABORT); - } - - void clear() - { - expected.clear(); - } - - void validate() - { - assertEquals("Expected ops were not all invoked", new LinkedList(), expected); - } - } - - class NullOp implements TxnOp - { - public void prepare(StoreContext context) throws AMQException - { - } - public void commit(StoreContext context) - { - } - public void undoPrepare() - { - } - public void rollback(StoreContext context) - { - } - } - - class FailedPrepare extends NullOp - { - public void prepare() throws AMQException - { - throw new AMQException(null, "Fail!", null); - } - } - - class TxnTester extends NullOp - { - private final MessageStore store; - - private final StoreContext context = new StoreContext(); - - TxnTester(MessageStore store) - { - this.store = store; - } - - public void prepare() throws AMQException - { - assertTrue("Expected prepare to be performed under txn", store.inTran(context)); - } - - public void commit() - { - assertTrue("Expected commit not to be performed under txn", !store.inTran(context)); - } - } - -} |
