summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java173
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java81
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java183
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java128
4 files changed, 565 insertions, 0 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
new file mode 100644
index 0000000000..51012bc776
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+{
+ // ChannelID(LIST) -> LinkedList<Pair>
+ final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+ public InternalTestProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+ new AMQCodecFactory(true));
+
+ _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return this;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return (byte) 8;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return (byte) 0;
+ }
+
+ // ***
+
+ public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count)
+ {
+ synchronized (_channelDelivers)
+ {
+ List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+ List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+ //Remove the msgs from the receivedList.
+ msgs.clear();
+
+ return response;
+ }
+ }
+
+ // *** ProtocolOutputConverter Implementation
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ }
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(consumerTag, consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ }
+ }
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ }
+
+ public void awaitDelivery(int msgs)
+ {
+ while (msgs > _deliveryCount.get())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public class DeliveryPair
+ {
+ private long _deliveryTag;
+ private AMQMessage _message;
+
+ public DeliveryPair(long deliveryTag, AMQMessage message)
+ {
+ _deliveryTag = deliveryTag;
+ _message = message;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+ public void closeProtocolSession(boolean waitLast)
+ {
+ // Override as we don't have a real IOSession to close.
+ // The alternative is to fully implement the TestIOSession to return a CloseFuture from close();
+ // Then the AMQMinaProtocolSession can join on the returning future without a NPE.
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
new file mode 100644
index 0000000000..a695a67eea
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreShutdownTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+import java.util.List;
+
+public class MessageStoreShutdownTest extends InternalBrokerBaseCase
+{
+
+ public void test()
+ {
+ subscribe(_session, _channel, _queue);
+
+ try
+ {
+ publishMessages(_session, _channel, 1);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ fail(e.getMessage());
+ }
+
+ try
+ {
+ _registry.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ fail(e.getMessage());
+ }
+
+ assertTrue("Session should now be closed", _session.isClosed());
+
+
+ //Test attempting to modify the broker state after session has been closed.
+
+ //The Message should have been removed from the unacked list.
+
+ //Ack Messages
+ List<InternalTestProtocolSession.DeliveryPair> list = _session.getDelivers(_channel.getChannelId(), new AMQShortString("sgen_1"), 1);
+
+ InternalTestProtocolSession.DeliveryPair pair = list.get(0);
+
+ try
+ {
+ // The message should now be requeued and so unable to ack it.
+ _channel.acknowledgeMessage(pair.getDeliveryTag(), false);
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Incorrect exception thrown", "Single ack on delivery tag 1 not known for channel:1", e.getMessage());
+ }
+
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
new file mode 100644
index 0000000000..c6cd5da01d
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+ protected IApplicationRegistry _registry;
+ protected MessageStore _messageStore;
+ protected AMQChannel _channel;
+ protected InternalTestProtocolSession _session;
+ protected VirtualHost _virtualHost;
+ protected StoreContext _storeContext = new StoreContext();
+ protected AMQQueue _queue;
+ protected AMQShortString QUEUE_NAME;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _registry = new TestApplicationRegistry();
+ ApplicationRegistry.initialise(_registry);
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ QUEUE_NAME = new AMQShortString("test");
+ _queue = AMQQueueFactory.createAMQQueueImpl(QUEUE_NAME, false, new AMQShortString("testowner"),
+ false, _virtualHost, null);
+
+ _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+ Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
+ _queue.bind(defaultExchange, QUEUE_NAME, null);
+
+ _session = new InternalTestProtocolSession();
+
+ _session.setVirtualHost(_virtualHost);
+
+ _channel = new AMQChannel(_session, 1, _messageStore);
+
+ _session.addChannel(_channel);
+ }
+
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
+ protected void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+
+ //The above publish message is sufficiently small not to fit in the header so no Body is required.
+ //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
+ }
+
+ protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ return channel.subscribeToQueue(null, queue, true, null, false, true);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
+ public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
+ {
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return QUEUE_NAME;
+ }
+ };
+
+ for (int count = 0; count < messages; count++)
+ {
+ channel.setPublishFrame(info, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+ //Set the body size
+ ContentHeaderBody _headerBody = new ContentHeaderBody();
+ _headerBody.bodySize = 0;
+
+ //Set Minimum properties
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+ properties.setExpiration(0L);
+ properties.setTimestamp(System.currentTimeMillis());
+
+ //Make Message Persistent
+ properties.setDeliveryMode((byte) 2);
+
+ _headerBody.properties = properties;
+
+ channel.publishContentHeader(_headerBody);
+ }
+
+ }
+
+ public void acknowledge(AMQChannel channel, long deliveryTag)
+ {
+ try
+ {
+ channel.acknowledgeMessage(deliveryTag, false);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
new file mode 100644
index 0000000000..471912c85a
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Arrays;
+
+public class TestApplicationRegistry extends ApplicationRegistry
+{
+ private QueueRegistry _queueRegistry;
+
+ private ExchangeRegistry _exchangeRegistry;
+
+ private ExchangeFactory _exchangeFactory;
+
+ private MessageStore _messageStore;
+
+ private VirtualHost _vHost;
+
+
+ public TestApplicationRegistry()
+ {
+ super(new MapConfiguration(new HashMap()));
+ }
+
+ public void initialise() throws Exception
+ {
+ Properties users = new Properties();
+
+ users.put("guest", "guest");
+
+ _databaseManager = new PropertiesPrincipalDatabaseManager("default", users);
+
+ _accessManager = new AllowAll();
+
+ _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
+
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+ _messageStore = new TestableMemoryMessageStore();
+
+ _virtualHostRegistry = new VirtualHostRegistry();
+
+ _vHost = new VirtualHost("test", _messageStore);
+
+ _virtualHostRegistry.registerVirtualHost(_vHost);
+
+ _queueRegistry = _vHost.getQueueRegistry();
+ _exchangeFactory = _vHost.getExchangeFactory();
+ _exchangeRegistry = _vHost.getExchangeRegistry();
+
+ _configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
+ }
+
+ public QueueRegistry getQueueRegistry()
+ {
+ return _queueRegistry;
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return _exchangeRegistry;
+ }
+
+ public ExchangeFactory getExchangeFactory()
+ {
+ return _exchangeFactory;
+ }
+
+ public Collection<String> getVirtualHostNames()
+ {
+ String[] hosts = {"test"};
+ return Arrays.asList(hosts);
+ }
+
+ public void setAccessManager(ACLPlugin newManager)
+ {
+ _accessManager = newManager;
+ }
+
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+}
+
+