diff options
Diffstat (limited to 'qpid/java/broker/src/test')
4 files changed, 565 insertions, 0 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java new file mode 100644 index 0000000000..51012bc776 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -0,0 +1,173 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.AMQChannel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter +{ + // ChannelID(LIST) -> LinkedList<Pair> + final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers; + private AtomicInteger _deliveryCount = new AtomicInteger(0); + + public InternalTestProtocolSession() throws AMQException + { + super(new TestIoSession(), + ApplicationRegistry.getInstance().getVirtualHostRegistry(), + new AMQCodecFactory(true)); + + _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); + + } + + public ProtocolOutputConverter getProtocolOutputConverter() + { + return this; + } + + public byte getProtocolMajorVersion() + { + return (byte) 8; + } + + public byte getProtocolMinorVersion() + { + return (byte) 0; + } + + // *** + + public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count) + { + synchronized (_channelDelivers) + { + List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count); + + List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs); + + //Remove the msgs from the receivedList. + msgs.clear(); + + return response; + } + } + + // *** ProtocolOutputConverter Implementation + public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException + { + } + + public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag) + { + } + + public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + { + _deliveryCount.incrementAndGet(); + + synchronized (_channelDelivers) + { + Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); + + if (consumers == null) + { + consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>(); + _channelDelivers.put(channelId, consumers); + } + + LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag); + + if (consumerDelivers == null) + { + consumerDelivers = new LinkedList<DeliveryPair>(); + consumers.put(consumerTag, consumerDelivers); + } + + consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + } + } + + public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + { + } + + public void awaitDelivery(int msgs) + { + while (msgs > _deliveryCount.get()) + { + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + public class DeliveryPair + { + private long _deliveryTag; + private AMQMessage _message; + + public DeliveryPair(long deliveryTag, AMQMessage message) + { + _deliveryTag = deliveryTag; + _message = message; + } + + public AMQMessage getMessage() + { + return _message; + } + + public long getDeliveryTag() + { + return _deliveryTag; + } + } + + public boolean isClosed() + { + return _closed; + } + + public void closeProtocolSession(boolean waitLast) + { + // Override as we don't have a real IOSession to close. + // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); + // Then the AMQMinaProtocolSession can join on the returning future without a NPE. + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java new file mode 100644 index 0000000000..a695a67eea --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; + +import java.util.List; + +public class MessageStoreShutdownTest extends InternalBrokerBaseCase +{ + + public void test() + { + subscribe(_session, _channel, _queue); + + try + { + publishMessages(_session, _channel, 1); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + try + { + _registry.close(); + } + catch (Exception e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + fail(e.getMessage()); + } + + assertTrue("Session should now be closed", _session.isClosed()); + + + //Test attempting to modify the broker state after session has been closed. + + //The Message should have been removed from the unacked list. + + //Ack Messages + List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1); + + InternalTestProtocolSession.DeliveryPair pair = list.get(0); + + try + { + // The message should now be requeued and so unable to ack it. + _channel.acknowledgeMessage(pair.getDeliveryTag(), false); + } + catch (AMQException e) + { + assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage()); + } + + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java new file mode 100644 index 0000000000..c6cd5da01d --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import junit.framework.TestCase; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.ConsumerTagNotUniqueException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; + +public class InternalBrokerBaseCase extends TestCase +{ + protected IApplicationRegistry _registry; + protected MessageStore _messageStore; + protected AMQChannel _channel; + protected InternalTestProtocolSession _session; + protected VirtualHost _virtualHost; + protected StoreContext _storeContext = new StoreContext(); + protected AMQQueue _queue; + protected AMQShortString QUEUE_NAME; + + public void setUp() throws Exception + { + super.setUp(); + _registry = new TestApplicationRegistry(); + ApplicationRegistry.initialise(_registry); + _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + QUEUE_NAME = new AMQShortString("test"); + _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"), + false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(_queue); + + Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + + _queue.bind(defaultExchange, QUEUE_NAME, null); + + _session = new InternalTestProtocolSession(); + + _session.setVirtualHost(_virtualHost); + + _channel = new AMQChannel(_session, 1, _messageStore); + + _session.addChannel(_channel); + } + + public void tearDown() throws Exception + { + ApplicationRegistry.removeAll(); + super.tearDown(); + } + + protected void checkStoreContents(int messageCount) + { + assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size()); + + //The above publish message is sufficiently small not to fit in the header so no Body is required. + //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size()); + } + + protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue) + { + try + { + return channel.subscribeToQueue(null, queue, true, null, false, true); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + catch (ConsumerTagNotUniqueException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + //Keep the compiler happy + return null; + } + + public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException + { + MessagePublishInfo info = new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return ExchangeDefaults.DEFAULT_EXCHANGE_NAME; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return QUEUE_NAME; + } + }; + + for (int count = 0; count < messages; count++) + { + channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange())); + + //Set the body size + ContentHeaderBody _headerBody = new ContentHeaderBody(); + _headerBody.bodySize = 0; + + //Set Minimum properties + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setExpiration(0L); + properties.setTimestamp(System.currentTimeMillis()); + + //Make Message Persistent + properties.setDeliveryMode((byte) 2); + + _headerBody.properties = properties; + + channel.publishContentHeader(_headerBody); + } + + } + + public void acknowledge(AMQChannel channel, long deliveryTag) + { + try + { + channel.acknowledgeMessage(deliveryTag, false); + } + catch (AMQException e) + { + e.printStackTrace(); + fail(e.getMessage()); + } + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java new file mode 100644 index 0000000000..471912c85a --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.ManagedObjectRegistry; +import org.apache.qpid.server.management.NoopManagedObjectRegistry; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.plugins.AllowAll; +import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Properties; +import java.util.Arrays; + +public class TestApplicationRegistry extends ApplicationRegistry +{ + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + private VirtualHost _vHost; + + + public TestApplicationRegistry() + { + super(new MapConfiguration(new HashMap())); + } + + public void initialise() throws Exception + { + Properties users = new Properties(); + + users.put("guest", "guest"); + + _databaseManager = new PropertiesPrincipalDatabaseManager("default", users); + + _accessManager = new AllowAll(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); + + _managedObjectRegistry = new NoopManagedObjectRegistry(); + + _messageStore = new TestableMemoryMessageStore(); + + _virtualHostRegistry = new VirtualHostRegistry(); + + _vHost = new VirtualHost("test", _messageStore); + + _virtualHostRegistry.registerVirtualHost(_vHost); + + _queueRegistry = _vHost.getQueueRegistry(); + _exchangeFactory = _vHost.getExchangeFactory(); + _exchangeRegistry = _vHost.getExchangeRegistry(); + + _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public Collection<String> getVirtualHostNames() + { + String[] hosts = {"test"}; + return Arrays.asList(hosts); + } + + public void setAccessManager(ACLPlugin newManager) + { + _accessManager = newManager; + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + +} + + |
