diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-02 15:26:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-02 15:26:42 +0000 |
| commit | 706370f431afa3e812b94878fd9bc6a09a0920d5 (patch) | |
| tree | fc3ab91d71d6a12f56de333eee696ea9d902c735 /qpid/java/broker/src/test | |
| parent | 296e94464d89b8d9affa0e73ba2d015a475dc88d (diff) | |
| download | qpid-python-706370f431afa3e812b94878fd9bc6a09a0920d5.tar.gz | |
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1498976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/test')
3 files changed, 112 insertions, 59 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java index 57e1fcd15c..4a6b3f2cad 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -21,9 +21,8 @@ package org.apache.qpid.server.store; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.isA; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -32,7 +31,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import java.io.File; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -48,13 +46,12 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.TestLogActor; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockStoredMessage; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.server.store.derby.DerbyMessageStore; @@ -71,9 +68,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase private VirtualHost _virtualHost; private ConfigurationRecoveryHandler _recoveryHandler; - private QueueRecoveryHandler _queueRecoveryHandler; - private ExchangeRecoveryHandler _exchangeRecoveryHandler; - private BindingRecoveryHandler _bindingRecoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; @@ -101,9 +95,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase setTestSystemProperty("QPID_WORK", TMP_FOLDER); _configuration = mock(Configuration.class); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _queueRecoveryHandler = mock(QueueRecoveryHandler.class); - _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); - _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -112,9 +103,6 @@ public class DurableConfigurationStoreTest extends QpidTestCase _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); - when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); - when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); @@ -140,21 +128,44 @@ public class DurableConfigurationStoreTest extends QpidTestCase public void testCreateExchange() throws Exception { Exchange exchange = createTestExchange(); - _configStore.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); reopenStore(); - verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); + verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(org.apache.qpid.server.model.Exchange.class.getName()), + eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(), + org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type", + org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString()))); + } + + private Map<String,Object> map(Object... vals) + { + Map<String,Object> map = new HashMap<String, Object>(); + boolean isValue = false; + String key = null; + for(Object obj : vals) + { + if(isValue) + { + map.put(key,obj); + } + else + { + key = (String) obj; + } + isValue = !isValue; + } + return map; } public void testRemoveExchange() throws Exception { Exchange exchange = createTestExchange(); - _configStore.createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(_configStore, exchange); - _configStore.removeExchange(exchange); + DurableConfigurationStoreHelper.removeExchange(_configStore, exchange); reopenStore(); - verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), anyString(), anyMap()); } public void testBindQueue() throws Exception @@ -162,13 +173,18 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _configStore.bindQueue(binding); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); reopenStore(); - ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes()); + Map<String,Object> map = new HashMap<String, Object>(); + map.put(org.apache.qpid.server.model.Binding.EXCHANGE, _exchange.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.QUEUE, queue.getId().toString()); + map.put(org.apache.qpid.server.model.Binding.NAME, ROUTING_KEY); + map.put(org.apache.qpid.server.model.Binding.ARGUMENTS,FieldTable.convertToMap(_bindingArgs)); - verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes); + verify(_recoveryHandler).configuredObject(eq(binding.getId()), eq(org.apache.qpid.server.model.Binding.class.getName()), + eq(map)); } public void testUnbindQueue() throws Exception @@ -176,22 +192,27 @@ public class DurableConfigurationStoreTest extends QpidTestCase AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); - _configStore.bindQueue(binding); + DurableConfigurationStoreHelper.createBinding(_configStore, binding); - _configStore.unbindQueue(binding); + DurableConfigurationStoreHelper.removeBinding(_configStore, binding); reopenStore(); - verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), - isA(ByteBuffer.class)); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Binding.class.getName()), + anyMap()); } public void testCreateQueueAMQQueue() throws Exception { AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - _configStore.createQueue(queue); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, null); + Map<String, Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testCreateQueueAMQQueueFieldTable() throws Exception @@ -202,10 +223,19 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments, null); + + + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception @@ -213,10 +243,17 @@ public class DurableConfigurationStoreTest extends QpidTestCase Exchange alternateExchange = createTestAlternateExchange(); AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - _configStore.createQueue(queue); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null, alternateExchange.getId()); + + Map<String, Object> queueAttributes = new HashMap<String, Object>(); + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } private Exchange createTestAlternateExchange() @@ -235,14 +272,25 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // update the queue to have exclusive=false queue = createTestQueue(getName(), getName() + "Owner", false); - _configStore.updateQueue(queue); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, null); + + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); + } public void testUpdateQueueAlternateExchange() throws Exception @@ -253,15 +301,26 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - _configStore.updateQueue(queue); + when(queue.getArguments()).thenReturn(attributes); + + DurableConfigurationStoreHelper.updateQueue(_configStore, queue); reopenStore(); - verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments, alternateExchange.getId()); + + Map<String,Object> queueAttributes = new HashMap<String, Object>(); + + queueAttributes.put(Queue.NAME, getName()); + queueAttributes.put(Queue.OWNER, getName()+"Owner"); + queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); + queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); + + verify(_recoveryHandler).configuredObject(eq(_queueId), eq(Queue.class.getName()), eq(queueAttributes)); } public void testRemoveQueue() throws Exception @@ -272,13 +331,14 @@ public class DurableConfigurationStoreTest extends QpidTestCase attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); FieldTable arguments = FieldTable.convertToFieldTable(attributes); - _configStore.createQueue(queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); // remove queue - _configStore.removeQueue(queue); + DurableConfigurationStoreHelper.removeQueue(_configStore,queue); reopenStore(); - verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), - any(FieldTable.class), any(UUID.class)); + verify(_recoveryHandler, never()).configuredObject(any(UUID.class), + eq(org.apache.qpid.server.model.Queue.class.getName()), + anyMap()); } private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 5eea002365..af4bbd1731 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -346,7 +346,8 @@ public class MessageStoreTest extends QpidTestCase 1, queueRegistry.getQueues().size()); //test that removing the queue means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeQueue(queueRegistry.getQueue(durableQueueName)); + final AMQQueue queue = queueRegistry.getQueue(durableQueueName); + DurableConfigurationStoreHelper.removeQueue(getVirtualHost().getDurableConfigurationStore(),queue); reloadVirtualHost(); @@ -399,7 +400,8 @@ public class MessageStoreTest extends QpidTestCase origExchangeCount + 1, exchangeRegistry.getExchangeNames().size()); //test that removing the exchange means it is not recovered next time - getVirtualHost().getDurableConfigurationStore().removeExchange(exchangeRegistry.getExchange(directExchangeName)); + final Exchange exchange = exchangeRegistry.getExchange(directExchangeName); + DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), exchange); reloadVirtualHost(); @@ -755,7 +757,9 @@ public class MessageStoreTest extends QpidTestCase if (queue.isDurable() && !queue.isAutoDelete()) { - getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments); + DurableConfigurationStoreHelper.createQueue(getVirtualHost().getDurableConfigurationStore(), + queue, + queueArguments); } } catch (AMQException e) @@ -799,7 +803,8 @@ public class MessageStoreTest extends QpidTestCase getVirtualHost().getExchangeRegistry().registerExchange(exchange); if (durable) { - getVirtualHost().getDurableConfigurationStore().createExchange(exchange); + DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), + exchange); } } catch (AMQException e) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java index a73057ebc1..2d68e94fcd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java @@ -26,18 +26,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; -import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; import org.apache.qpid.test.utils.QpidTestCase; public abstract class MessageStoreTestCase extends QpidTestCase { private ConfigurationRecoveryHandler _recoveryHandler; - private QueueRecoveryHandler _queueRecoveryHandler; - private ExchangeRecoveryHandler _exchangeRecoveryHandler; - private BindingRecoveryHandler _bindingRecoveryHandler; private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; private TransactionLogRecoveryHandler _logRecoveryHandler; @@ -52,9 +46,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase super.setUp(); _recoveryHandler = mock(ConfigurationRecoveryHandler.class); - _queueRecoveryHandler = mock(QueueRecoveryHandler.class); - _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); - _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); @@ -63,9 +54,6 @@ public abstract class MessageStoreTestCase extends QpidTestCase _virtualHost = mock(VirtualHost.class); when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); - when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_exchangeRecoveryHandler); - when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_queueRecoveryHandler); - when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_bindingRecoveryHandler); when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); |
