summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-13 18:43:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-13 18:43:28 +0000
commit2e24b12c60f054db7c37287dbe50c92769923416 (patch)
treecdc76b904c6a6f8513b0ca08c12fa1cc801bcc5b /qpid/java/broker/src/test
parentce70d9dec7f82513d3d421f97c0a446987fd8107 (diff)
downloadqpid-python-2e24b12c60f054db7c37287dbe50c92769923416.tar.gz
QPID-4983 : [Java Broker] Move store implementations to broker plugins
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1502835 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java948
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java34
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java54
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java11
-rw-r--r--qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory19
10 files changed, 123 insertions, 962 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 616ee74b2d..9756cdfd55 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -31,8 +31,8 @@ import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
@@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue = new MockAMQQueue(getName());
- private MessageStore _messageStore = new MemoryMessageStore();
+ private MessageStore _messageStore = new TestMemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
index 042abca9c4..f00d12b77d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
@@ -78,7 +78,7 @@ public class VirtualHostRecovererTest extends TestCase
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, "MEMORY");
+ attributes.put(VirtualHost.STORE_TYPE, "TESTMEMORY");
when(entry.getAttributes()).thenReturn(attributes);
VirtualHost host = recoverer.create(null, entry, parent);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f1bf632235..3ee2345cee 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -35,8 +35,8 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -56,7 +56,7 @@ public class TopicExchangeTest extends QpidTestCase
BrokerTestHelper.setUp();
_exchange = new TopicExchange();
_vhost = BrokerTestHelper.createVirtualHost(getName());
- _store = new MemoryMessageStore();
+ _store = new TestMemoryMessageStore();
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 05d5d75864..ce213ee582 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -38,9 +38,8 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
public class VirtualHostTest extends TestCase
@@ -92,7 +91,7 @@ public class VirtualHostTest extends TestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE);
+ attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE);
attributes.put(VirtualHost.STATE, State.QUIESCED);
VirtualHost host = createHost(attributes);
@@ -131,7 +130,7 @@ public class VirtualHostTest extends TestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, StandardVirtualHostFactory.TYPE);
- attributes.put(VirtualHost.STORE_TYPE, MemoryMessageStore.TYPE);
+ attributes.put(VirtualHost.STORE_TYPE, TestMemoryMessageStore.TYPE);
VirtualHost host = createHost(attributes);
return host;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
deleted file mode 100644
index 8b678c4eb4..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ /dev/null
@@ -1,948 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.FileUtils;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * This tests the MessageStores by using the available interfaces.
- *
- * For persistent stores, it validates that Exchanges, Queues, Bindings and
- * Messages are persisted and recovered correctly.
- */
-public class MessageStoreTest extends QpidTestCase
-{
- public static final int DEFAULT_PRIORTY_LEVEL = 5;
- public static final String SELECTOR_VALUE = "Test = 'MST'";
- public static final String LVQ_KEY = "MST-LVQ-KEY";
-
- private String nonDurableExchangeName = "MST-NonDurableDirectExchange";
- private String directExchangeName = "MST-DirectExchange";
- private String topicExchangeName = "MST-TopicExchange";
-
- private AMQShortString durablePriorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue-Durable");
- private AMQShortString durableTopicQueueName = new AMQShortString("MST-TopicQueue-Durable");
- private AMQShortString priorityTopicQueueName = new AMQShortString("MST-PriorityTopicQueue");
- private AMQShortString topicQueueName = new AMQShortString("MST-TopicQueue");
-
- private AMQShortString durableExclusiveQueueName = new AMQShortString("MST-Queue-Durable-Exclusive");
- private AMQShortString durablePriorityQueueName = new AMQShortString("MST-PriorityQueue-Durable");
- private AMQShortString durableLastValueQueueName = new AMQShortString("MST-LastValueQueue-Durable");
- private AMQShortString durableQueueName = new AMQShortString("MST-Queue-Durable");
- private AMQShortString priorityQueueName = new AMQShortString("MST-PriorityQueue");
- private AMQShortString queueName = new AMQShortString("MST-Queue");
-
- private AMQShortString directRouting = new AMQShortString("MST-direct");
- private AMQShortString topicRouting = new AMQShortString("MST-topic");
-
- private AMQShortString queueOwner = new AMQShortString("MST");
-
- private PropertiesConfiguration _config;
-
- private VirtualHost _virtualHost;
- private org.apache.qpid.server.model.VirtualHost _virtualHostModel;
- private Broker _broker;
- private String _storePath;
-
- public void setUp() throws Exception
- {
- super.setUp();
- BrokerTestHelper.setUp();
-
- _storePath = System.getProperty("QPID_WORK") + File.separator + getName();
-
- _config = new PropertiesConfiguration();
- _config.addProperty("store.class", getTestProfileMessageStoreClassName());
- _config.addProperty("store.environment-path", _storePath);
- _virtualHostModel = mock(org.apache.qpid.server.model.VirtualHost.class);
- when(_virtualHostModel.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_PATH))).thenReturn(_storePath);
-
-
-
- cleanup(new File(_storePath));
-
- _broker = BrokerTestHelper.createBrokerMock();
-
- reloadVirtualHost();
- }
-
- protected String getStorePath()
- {
- return _storePath;
- }
-
- protected org.apache.qpid.server.model.VirtualHost getVirtualHostModel()
- {
- return _virtualHostModel;
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- if (_virtualHost != null)
- {
- _virtualHost.close();
- }
- }
- finally
- {
- BrokerTestHelper.tearDown();
- super.tearDown();
- }
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public PropertiesConfiguration getConfig()
- {
- return _config;
- }
-
- protected void reloadVirtualHost()
- {
- VirtualHost original = getVirtualHost();
-
- if (getVirtualHost() != null)
- {
- try
- {
- getVirtualHost().close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- try
- {
- _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getClass().getName(), _config, _broker),null,getVirtualHostModel());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertTrue("Virtualhost has not changed, reload was not successful", original != getVirtualHost());
- }
-
- /**
- * Old MessageStoreTest segment which runs against both persistent and non-persistent stores
- * creating queues, exchanges and bindings and then verifying message delivery to them.
- */
- public void testQueueExchangeAndBindingCreation() throws Exception
- {
- assertEquals("Should not be any existing queues", 0, getVirtualHost().getQueueRegistry().getQueues().size());
-
- createAllQueues();
- createAllTopicQueues();
-
- //Register Non-Durable DirectExchange
- Exchange nonDurableExchange = createExchange(DirectExchange.TYPE, nonDurableExchangeName, false);
- bindAllQueuesToExchange(nonDurableExchange, directRouting);
-
- //Register DirectExchange
- Exchange directExchange = createExchange(DirectExchange.TYPE, directExchangeName, true);
- bindAllQueuesToExchange(directExchange, directRouting);
-
- //Register TopicExchange
- Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
- bindAllTopicQueuesToExchange(topicExchange, topicRouting);
-
- //Send Message To NonDurable direct Exchange = persistent
- sendMessageOnExchange(nonDurableExchange, directRouting, true);
- // and non-persistent
- sendMessageOnExchange(nonDurableExchange, directRouting, false);
-
- //Send Message To direct Exchange = persistent
- sendMessageOnExchange(directExchange, directRouting, true);
- // and non-persistent
- sendMessageOnExchange(directExchange, directRouting, false);
-
- //Send Message To topic Exchange = persistent
- sendMessageOnExchange(topicExchange, topicRouting, true);
- // and non-persistent
- sendMessageOnExchange(topicExchange, topicRouting, false);
-
- //Ensure all the Queues have four messages (one transient, one persistent) x 2 exchange routings
- validateMessageOnQueues(4, true);
- //Ensure all the topics have two messages (one transient, one persistent)
- validateMessageOnTopics(2, true);
-
- assertEquals("Not all queues correctly registered",
- 10, getVirtualHost().getQueueRegistry().getQueues().size());
- }
-
- /**
- * Tests message persistence by running the testQueueExchangeAndBindingCreation() method above
- * before reloading the virtual host and ensuring that the persistent messages were restored.
- *
- * More specific testing of message persistence is left to store-specific unit testing.
- */
- public void testMessagePersistence() throws Exception
- {
- testQueueExchangeAndBindingCreation();
-
- reloadVirtualHost();
-
- //Validate durable queues and subscriptions still have the persistent messages
- validateMessageOnQueues(2, false);
- validateMessageOnTopics(1, false);
- }
-
- /**
- * Tests message removal by running the testMessagePersistence() method above before
- * clearing the queues, reloading the virtual host, and ensuring that the persistent
- * messages were removed from the queues.
- */
- public void testMessageRemoval() throws Exception
- {
- testMessagePersistence();
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- assertEquals("Incorrect number of queues registered after recovery",
- 6, queueRegistry.getQueues().size());
-
- //clear the queue
- queueRegistry.getQueue(durableQueueName).clearQueue();
-
- //check the messages are gone
- validateMessageOnQueue(durableQueueName, 0);
-
- //reload and verify messages arent restored
- reloadVirtualHost();
-
- validateMessageOnQueue(durableQueueName, 0);
- }
-
- /**
- * Tests queue persistence by creating a selection of queues with differing properties, both
- * durable and non durable, and ensuring that following the recovery process the correct queues
- * are present and any property manipulations (eg queue exclusivity) are correctly recovered.
- */
- public void testQueuePersistence() throws Exception
- {
- assertEquals("Should not be any existing queues",
- 0, getVirtualHost().getQueueRegistry().getQueues().size());
-
- //create durable and non durable queues/topics
- createAllQueues();
- createAllTopicQueues();
-
- //reload the virtual host, prompting recovery of the queues/topics
- reloadVirtualHost();
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- assertEquals("Incorrect number of queues registered after recovery",
- 6, queueRegistry.getQueues().size());
-
- //Validate the non-Durable Queues were not recovered.
- assertNull("Non-Durable queue still registered:" + priorityQueueName,
- queueRegistry.getQueue(priorityQueueName));
- assertNull("Non-Durable queue still registered:" + queueName,
- queueRegistry.getQueue(queueName));
- assertNull("Non-Durable queue still registered:" + priorityTopicQueueName,
- queueRegistry.getQueue(priorityTopicQueueName));
- assertNull("Non-Durable queue still registered:" + topicQueueName,
- queueRegistry.getQueue(topicQueueName));
-
- //Validate normally expected properties of Queues/Topics
- validateDurableQueueProperties();
-
- //Update the durable exclusive queue's exclusivity
- setQueueExclusivity(false);
- validateQueueExclusivityProperty(false);
- }
-
- /**
- * Tests queue removal by creating a durable queue, verifying it recovers, and
- * then removing it from the store, and ensuring that following the second reload
- * process it is not recovered.
- */
- public void testDurableQueueRemoval() throws Exception
- {
- //Register Durable Queue
- createQueue(durableQueueName, false, true, false, false);
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of queues registered before recovery",
- 1, queueRegistry.getQueues().size());
-
- reloadVirtualHost();
-
- queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of queues registered after first recovery",
- 1, queueRegistry.getQueues().size());
-
- //test that removing the queue means it is not recovered next time
- final AMQQueue queue = queueRegistry.getQueue(durableQueueName);
- DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue);
-
- reloadVirtualHost();
-
- queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of queues registered after second recovery",
- 0, queueRegistry.getQueues().size());
- assertNull("Durable queue was not removed:" + durableQueueName,
- queueRegistry.getQueue(durableQueueName));
- }
-
- /**
- * Tests exchange persistence by creating a selection of exchanges, both durable
- * and non durable, and ensuring that following the recovery process the correct
- * durable exchanges are still present.
- */
- public void testExchangePersistence() throws Exception
- {
- int origExchangeCount = getVirtualHost().getExchanges().size();
-
- Map<String, Exchange> oldExchanges = createExchanges();
-
- assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchanges().size());
-
- reloadVirtualHost();
-
- //verify the exchanges present after recovery
- validateExchanges(origExchangeCount, oldExchanges);
- }
-
- /**
- * Tests exchange removal by creating a durable exchange, verifying it recovers, and
- * then removing it from the store, and ensuring that following the second reload
- * process it is not recovered.
- */
- public void testDurableExchangeRemoval() throws Exception
- {
- int origExchangeCount = getVirtualHost().getExchanges().size();
-
- createExchange(DirectExchange.TYPE, directExchangeName, true);
-
- assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 1, getVirtualHost().getExchanges().size());
-
- reloadVirtualHost();
-
- assertEquals("Incorrect number of exchanges registered after first recovery",
- origExchangeCount + 1, getVirtualHost().getExchanges().size());
-
- //test that removing the exchange means it is not recovered next time
- final Exchange exchange = getVirtualHost().getExchange(directExchangeName);
- DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange);
-
- reloadVirtualHost();
-
- assertEquals("Incorrect number of exchanges registered after second recovery",
- origExchangeCount, getVirtualHost().getExchanges().size());
- assertNull("Durable exchange was not removed:" + directExchangeName,
- getVirtualHost().getExchange(directExchangeName));
- }
-
- /**
- * Tests binding persistence by creating a selection of queues and exchanges, both durable
- * and non durable, then adding bindings with and without selectors before reloading the
- * virtual host and verifying that following the recovery process the correct durable
- * bindings (those for durable queues to durable exchanges) are still present.
- */
- public void testBindingPersistence() throws Exception
- {
- int origExchangeCount = getVirtualHost().getExchanges().size();
-
- createAllQueues();
- createAllTopicQueues();
-
- Map<String, Exchange> exchanges = createExchanges();
-
- Exchange nonDurableExchange = exchanges.get(nonDurableExchangeName);
- Exchange directExchange = exchanges.get(directExchangeName);
- Exchange topicExchange = exchanges.get(topicExchangeName);
-
- bindAllQueuesToExchange(nonDurableExchange, directRouting);
- bindAllQueuesToExchange(directExchange, directRouting);
- bindAllTopicQueuesToExchange(topicExchange, topicRouting);
-
- assertEquals("Incorrect number of exchanges registered before recovery",
- origExchangeCount + 3, getVirtualHost().getExchanges().size());
-
- reloadVirtualHost();
-
- validateExchanges(origExchangeCount, exchanges);
-
- validateBindingProperties();
- }
-
- /**
- * Tests binding removal by creating a durable exchange, and queue, binding them together,
- * recovering to verify the persistence, then removing it from the store, and ensuring
- * that following the second reload process it is not recovered.
- */
- public void testDurableBindingRemoval() throws Exception
- {
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- //create durable queue and exchange, bind them
- Exchange exch = createExchange(DirectExchange.TYPE, directExchangeName, true);
- createQueue(durableQueueName, false, true, false, false);
- bindQueueToExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
-
- assertEquals("Incorrect number of bindings registered before recovery",
- 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
-
- //verify binding is actually normally recovered
- reloadVirtualHost();
-
- queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of bindings registered after first recovery",
- 1, queueRegistry.getQueue(durableQueueName).getBindings().size());
-
- exch = getVirtualHost().getExchange(directExchangeName);
- assertNotNull("Exchange was not recovered", exch);
-
- //remove the binding and verify result after recovery
- unbindQueueFromExchange(exch, directRouting, queueRegistry.getQueue(durableQueueName), false, null);
-
- reloadVirtualHost();
-
- queueRegistry = getVirtualHost().getQueueRegistry();
- assertEquals("Incorrect number of bindings registered after second recovery",
- 0, queueRegistry.getQueue(durableQueueName).getBindings().size());
- }
-
- /**
- * Validates that the durable exchanges are still present, the non durable exchange is not,
- * and that the new exchanges are not the same objects as the provided list (i.e. that the
- * reload actually generated new exchange objects)
- */
- private void validateExchanges(int originalNumExchanges, Map<String, Exchange> oldExchanges)
- {
- Collection<Exchange> exchanges = getVirtualHost().getExchanges();
- Collection<String> exchangeNames = new ArrayList(exchanges.size());
- for(Exchange exchange : exchanges)
- {
- exchangeNames.add(exchange.getName());
- }
- assertTrue(directExchangeName + " exchange NOT reloaded",
- exchangeNames.contains(directExchangeName));
- assertTrue(topicExchangeName + " exchange NOT reloaded",
- exchangeNames.contains(topicExchangeName));
- assertTrue(nonDurableExchangeName + " exchange reloaded",
- !exchangeNames.contains(nonDurableExchangeName));
-
- //check the old exchange objects are not the same as the new exchanges
- assertTrue(directExchangeName + " exchange NOT reloaded",
- getVirtualHost().getExchange(directExchangeName) != oldExchanges.get(directExchangeName));
- assertTrue(topicExchangeName + " exchange NOT reloaded",
- getVirtualHost().getExchange(topicExchangeName) != oldExchanges.get(topicExchangeName));
-
- // There should only be the original exchanges + our 2 recovered durable exchanges
- assertEquals("Incorrect number of exchanges available",
- originalNumExchanges + 2, getVirtualHost().getExchanges().size());
- }
-
- /** Validates the Durable queues and their properties are as expected following recovery */
- private void validateBindingProperties()
- {
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- assertEquals("Incorrect number of (durable) queues following recovery", 6, queueRegistry.getQueues().size());
-
- validateBindingProperties(queueRegistry.getQueue(durablePriorityQueueName).getBindings(), false);
- validateBindingProperties(queueRegistry.getQueue(durablePriorityTopicQueueName).getBindings(), true);
- validateBindingProperties(queueRegistry.getQueue(durableQueueName).getBindings(), false);
- validateBindingProperties(queueRegistry.getQueue(durableTopicQueueName).getBindings(), true);
- validateBindingProperties(queueRegistry.getQueue(durableExclusiveQueueName).getBindings(), false);
- }
-
- /**
- * Validate that each queue is bound only once following recovery (i.e. that bindings for non durable
- * queues or to non durable exchanges are not recovered), and if a selector should be present
- * that it is and contains the correct value
- *
- * @param bindings the set of bindings to validate
- * @param useSelectors if set, check the binding has a JMS_SELECTOR argument and the correct value for it
- */
- private void validateBindingProperties(List<Binding> bindings, boolean useSelectors)
- {
- assertEquals("Each queue should only be bound once.", 1, bindings.size());
-
- Binding binding = bindings.get(0);
-
- if (useSelectors)
- {
- assertTrue("Binding does not contain a Selector argument.",
- binding.getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.toString()));
- assertEquals("The binding selector argument is incorrect", SELECTOR_VALUE,
- binding.getArguments().get(AMQPFilterTypes.JMS_SELECTOR.toString()).toString());
- }
- }
-
- private void setQueueExclusivity(boolean exclusive) throws AMQException
- {
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
-
- queue.setExclusive(exclusive);
- }
-
- private void validateQueueExclusivityProperty(boolean expected)
- {
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- AMQQueue queue = queueRegistry.getQueue(durableExclusiveQueueName);
-
- assertEquals("Queue exclusivity was incorrect", queue.isExclusive(), expected);
- }
-
-
- private void validateDurableQueueProperties()
- {
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- validateQueueProperties(queueRegistry.getQueue(durablePriorityQueueName), true, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durablePriorityTopicQueueName), true, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableQueueName), false, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableTopicQueueName), false, true, false, false);
- validateQueueProperties(queueRegistry.getQueue(durableExclusiveQueueName), false, true, true, false);
- validateQueueProperties(queueRegistry.getQueue(durableLastValueQueueName), false, true, true, true);
- }
-
- private void validateQueueProperties(AMQQueue queue, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
- {
- if(usePriority || lastValueQueue)
- {
- assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
- }
-
- if (usePriority)
- {
- assertEquals("Queue is no longer a Priority Queue", AMQPriorityQueue.class, queue.getClass());
- assertEquals("Priority Queue does not have set priorities",
- DEFAULT_PRIORTY_LEVEL, ((AMQPriorityQueue) queue).getPriorities());
- }
- else if (lastValueQueue)
- {
- assertEquals("Queue is no longer a LastValue Queue", ConflationQueue.class, queue.getClass());
- assertEquals("LastValue Queue Key has changed", LVQ_KEY, ((ConflationQueue) queue).getConflationKey());
- }
- else
- {
- assertEquals("Queue is not 'simple'", SimpleAMQQueue.class, queue.getClass());
- }
-
- assertEquals("Queue owner is not as expected", queueOwner, queue.getOwner());
- assertEquals("Queue durability is not as expected", durable, queue.isDurable());
- assertEquals("Queue exclusivity is not as expected", exclusive, queue.isExclusive());
- }
-
- /**
- * Delete the Store Environment path
- *
- * @param environmentPath The configuration that contains the store environment path.
- */
- private void cleanup(File environmentPath)
- {
- if (environmentPath.exists())
- {
- FileUtils.delete(environmentPath, true);
- }
- }
-
- private void sendMessageOnExchange(Exchange exchange, AMQShortString routingKey, boolean deliveryMode)
- {
- //Set MessagePersistence
- BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
- properties.setDeliveryMode(deliveryMode ? Integer.valueOf(2).byteValue() : Integer.valueOf(1).byteValue());
- FieldTable headers = properties.getHeaders();
- headers.setString("Test", "MST");
- properties.setHeaders(headers);
-
- MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
-
- final IncomingMessage currentMessage;
-
-
- currentMessage = new IncomingMessage(messageInfo);
-
- currentMessage.setExchange(exchange);
-
- ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
-
- try
- {
- currentMessage.setContentHeaderBody(headerBody);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
-
- currentMessage.setExpiration();
-
- MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
- currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
- currentMessage.getStoredMessage().flushToStore();
- currentMessage.route();
-
-
- // check and deliver if header says body length is zero
- if (currentMessage.allContentReceived())
- {
- ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
- trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
-
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- e.printStackTrace();
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
- }
- }
-
- private void createAllQueues()
- {
- //Register Durable Priority Queue
- createQueue(durablePriorityQueueName, true, true, false, false);
-
- //Register Durable Simple Queue
- createQueue(durableQueueName, false, true, false, false);
-
- //Register Durable Exclusive Simple Queue
- createQueue(durableExclusiveQueueName, false, true, true, false);
-
- //Register Durable LastValue Queue
- createQueue(durableLastValueQueueName, false, true, true, true);
-
- //Register NON-Durable Priority Queue
- createQueue(priorityQueueName, true, false, false, false);
-
- //Register NON-Durable Simple Queue
- createQueue(queueName, false, false, false, false);
- }
-
- private void createAllTopicQueues()
- {
- //Register Durable Priority Queue
- createQueue(durablePriorityTopicQueueName, true, true, false, false);
-
- //Register Durable Simple Queue
- createQueue(durableTopicQueueName, false, true, false, false);
-
- //Register NON-Durable Priority Queue
- createQueue(priorityTopicQueueName, true, false, false, false);
-
- //Register NON-Durable Simple Queue
- createQueue(topicQueueName, false, false, false, false);
- }
-
- private void createQueue(AMQShortString queueName, boolean usePriority, boolean durable, boolean exclusive, boolean lastValueQueue)
- {
-
- FieldTable queueArguments = null;
-
- if(usePriority || lastValueQueue)
- {
- assertNotSame("Queues cant be both Priority and LastValue based", usePriority, lastValueQueue);
- }
-
- if (usePriority)
- {
- queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
- }
-
- if (lastValueQueue)
- {
- queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY), LVQ_KEY);
- }
-
- AMQQueue queue = null;
-
- //Ideally we would be able to use the QueueDeclareHandler here.
- try
- {
- queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName.asString(), durable, queueOwner.asString(), false, exclusive,
- getVirtualHost(), FieldTable.convertToMap(queueArguments));
-
- validateQueueProperties(queue, usePriority, durable, exclusive, lastValueQueue);
-
- if (queue.isDurable() && !queue.isAutoDelete())
- {
- DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(),
- queue,
- queueArguments);
- }
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
-
- getVirtualHost().getQueueRegistry().registerQueue(queue);
-
- }
-
- private Map<String, Exchange> createExchanges()
- {
- Map<String, Exchange> exchanges = new HashMap<String, Exchange>();
-
- //Register non-durable DirectExchange
- exchanges.put(nonDurableExchangeName, createExchange(DirectExchange.TYPE, nonDurableExchangeName, false));
-
- //Register durable DirectExchange and TopicExchange
- exchanges.put(directExchangeName ,createExchange(DirectExchange.TYPE, directExchangeName, true));
- exchanges.put(topicExchangeName,createExchange(TopicExchange.TYPE, topicExchangeName, true));
-
- return exchanges;
- }
-
- private Exchange createExchange(ExchangeType<?> type, String name, boolean durable)
- {
- Exchange exchange = null;
-
- try
- {
- exchange = getVirtualHost().createExchange(null, name, type.getName().toString(), durable, false, null);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
-
- return exchange;
- }
-
- private void bindAllQueuesToExchange(Exchange exchange, AMQShortString routingKey)
- {
- FieldTable queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityQueueName), false, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableQueueName), false, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityQueueName), false, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(queueName), false, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableExclusiveQueueName), false, null);
- }
-
- private void bindAllTopicQueuesToExchange(Exchange exchange, AMQShortString routingKey)
- {
- FieldTable queueArguments = new FieldTable();
- queueArguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), DEFAULT_PRIORTY_LEVEL);
-
- QueueRegistry queueRegistry = getVirtualHost().getQueueRegistry();
-
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durablePriorityTopicQueueName), true, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(durableTopicQueueName), true, null);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(priorityTopicQueueName), true, queueArguments);
- bindQueueToExchange(exchange, routingKey, queueRegistry.getQueue(topicQueueName), true, null);
- }
-
-
- protected void bindQueueToExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
- {
- FieldTable bindArguments = null;
-
- if (useSelector)
- {
- bindArguments = new FieldTable();
- bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
- }
-
- try
- {
- exchange.addBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments));
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
- }
-
- protected void unbindQueueFromExchange(Exchange exchange, AMQShortString routingKey, AMQQueue queue, boolean useSelector, FieldTable queueArguments)
- {
- FieldTable bindArguments = null;
-
- if (useSelector)
- {
- bindArguments = new FieldTable();
- bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), SELECTOR_VALUE );
- }
-
- try
- {
- exchange.removeBinding(String.valueOf(routingKey), queue, FieldTable.convertToMap(bindArguments));
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
- }
-
- private void validateMessageOnTopics(long messageCount, boolean allQueues)
- {
- validateMessageOnQueue(durablePriorityTopicQueueName, messageCount);
- validateMessageOnQueue(durableTopicQueueName, messageCount);
-
- if (allQueues)
- {
- validateMessageOnQueue(priorityTopicQueueName, messageCount);
- validateMessageOnQueue(topicQueueName, messageCount);
- }
- }
-
- private void validateMessageOnQueues(long messageCount, boolean allQueues)
- {
- validateMessageOnQueue(durablePriorityQueueName, messageCount);
- validateMessageOnQueue(durableQueueName, messageCount);
-
- if (allQueues)
- {
- validateMessageOnQueue(priorityQueueName, messageCount);
- validateMessageOnQueue(queueName, messageCount);
- }
- }
-
- private void validateMessageOnQueue(AMQShortString queueName, long messageCount)
- {
- AMQQueue queue = getVirtualHost().getQueueRegistry().getQueue(queueName);
-
- assertNotNull("Queue(" + queueName + ") not correctly registered:", queue);
-
- assertEquals("Incorrect Message count on queue:" + queueName, messageCount, queue.getMessageCount());
- }
-
- private class TestMessagePublishInfo implements MessagePublishInfo
- {
-
- Exchange _exchange;
- boolean _immediate;
- boolean _mandatory;
- AMQShortString _routingKey;
-
- TestMessagePublishInfo(Exchange exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
- {
- _exchange = exchange;
- _immediate = immediate;
- _mandatory = mandatory;
- _routingKey = routingKey;
- }
-
- public AMQShortString getExchange()
- {
- return _exchange.getNameShortString();
- }
-
- public void setExchange(AMQShortString exchange)
- {
- //no-op
- }
-
- public boolean isImmediate()
- {
- return _immediate;
- }
-
- public boolean isMandatory()
- {
- return _mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
new file mode 100644
index 0000000000..32df355c07
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class TestMemoryMessageStore extends AbstractMemoryMessageStore
+{
+ public static final String TYPE = "TestMemory";
+
+ @Override
+ public String getStoreType()
+ {
+ return TYPE;
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
new file mode 100644
index 0000000000..fd2d4215ab
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.plugin.MessageStoreFactory;
+
+public class TestMemoryMessageStoreFactory implements MessageStoreFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return TestMemoryMessageStore.TYPE;
+ }
+
+ @Override
+ public MessageStore createMessageStore()
+ {
+ return new TestMemoryMessageStore();
+ }
+
+ @Override
+ public Map<String, Object> convertStoreConfiguration(Configuration configuration)
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void validateAttributes(Map<String, Object> attributes)
+ {
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 210408f490..bb3c0cf535 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
-public class TestableMemoryMessageStore extends MemoryMessageStore
+public class TestableMemoryMessageStore extends TestMemoryMessageStore
{
private final Map<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
private final AtomicInteger _messageCount = new AtomicInteger(0);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index 6b8ea0e80b..e72196c383 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
@@ -34,7 +36,7 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -306,7 +308,7 @@ public class StandardVirtualHostTest extends QpidTestCase
writer.write(" <" + vhostName + ">");
writer.write(" <type>" + StandardVirtualHostFactory.TYPE + "</type>");
writer.write(" <store>");
- writer.write(" <class>" + MemoryMessageStore.class.getName() + "</class>");
+ writer.write(" <class>" + TestMemoryMessageStore.class.getName() + "</class>");
writer.write(" </store>");
if(exchangeName != null && !dontDeclare)
{
@@ -363,10 +365,11 @@ public class StandardVirtualHostTest extends QpidTestCase
_virtualHostRegistry = broker.getVirtualHostRegistry();
Configuration config = new PropertiesConfiguration();
- config.setProperty("store.type", MemoryMessageStore.TYPE);
VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker);
+ final org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class);
+ when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(TestMemoryMessageStore.TYPE);
VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration,
- mock(org.apache.qpid.server.model.VirtualHost.class));
+ virtualHost);
_virtualHostRegistry.registerVirtualHost(host);
return host;
}
diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
new file mode 100644
index 0000000000..9512fb8117
--- /dev/null
+++ b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.store.TestMemoryMessageStoreFactory