diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-16 10:22:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-16 10:22:07 +0000 |
| commit | e80a585caa90715069cd66e19ce40f0859086149 (patch) | |
| tree | 7123ff15569b5ff3dd602df0c3b8f03a9e92bc8c /qpid/java | |
| parent | ccb4bc1e6fc19bb7275164820d2f4a072aea96ee (diff) | |
| download | qpid-python-e80a585caa90715069cd66e19ce40f0859086149.tar.gz | |
QPID-5073 : Add dependency on alternate exchange for queues where such an alternate is set
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 167 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 4e06cf3202..7929cd3e39 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; @@ -62,21 +64,48 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ private class UnresolvedQueue implements UnresolvedObject<AMQQueue> { + private final Map<String, Object> _attributes; + private final UUID _alternateExchangeId; + private final UUID _id; private AMQQueue _queue; + private List<UnresolvedDependency> _dependencies = new ArrayList<UnresolvedDependency>(); + private Exchange _alternateExchange; public UnresolvedQueue(final UUID id, final String type, - final Map<String, Object> attributeMap) + final Map<String, Object> attributes) { - String queueName = (String) attributeMap.get(Queue.NAME); - String owner = (String) attributeMap.get(Queue.OWNER); - boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); - UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE)); + _attributes = attributes; + _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String) _attributes + .get(Queue.ALTERNATE_EXCHANGE)); + _id = id; + if (_alternateExchangeId != null) + { + _alternateExchange = _exchangeRegistry.getExchange(_alternateExchangeId); + if(_alternateExchange == null) + { + _dependencies.add(new AlternateExchangeDependency()); + } + } + } + + @Override + public UnresolvedDependency[] getUnresolvedDependencies() + { + return _dependencies.toArray(new UnresolvedDependency[_dependencies.size()]); + } + + @Override + public AMQQueue resolve() + { + String queueName = (String) _attributes.get(Queue.NAME); + String owner = (String) _attributes.get(Queue.OWNER); + boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); @SuppressWarnings("unchecked") - Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS); + Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS); try { - _queue = _virtualHost.getQueueRegistry().getQueue(id); + _queue = _virtualHost.getQueueRegistry().getQueue(_id); if(_queue == null) { _queue = _virtualHost.getQueueRegistry().getQueue(queueName); @@ -84,38 +113,43 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ if (_queue == null) { - _queue = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, - queueArgumentsMap); + _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost, + queueArgumentsMap); _virtualHost.getQueueRegistry().registerQueue(_queue); - if (alternateExchangeId != null) + if (_alternateExchange != null) { - Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId); - if (altExchange == null) - { - _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id); - return; - } - _queue.setAlternateExchange(altExchange); + _queue.setAlternateExchange(_alternateExchange); } } } catch (AMQException e) { - throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e); + throw new RuntimeException("Error recovering queue uuid " + _id + " name " + queueName, e); } + return _queue; } - @Override - public UnresolvedDependency[] getUnresolvedDependencies() + private class AlternateExchangeDependency implements UnresolvedDependency { - return new UnresolvedDependency[0]; - } + @Override + public UUID getId() + { + return _alternateExchangeId; + } - @Override - public AMQQueue resolve() - { - return _queue; + @Override + public String getType() + { + return "Exchange"; + } + + @Override + public void resolve(final Object dependency) + { + _alternateExchange = (Exchange) dependency; + _dependencies.remove(this); + } } } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index ccc7f6a697..6a97eb38cb 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -28,7 +28,10 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; @@ -36,21 +39,27 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.security.*; +import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -73,6 +82,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; private ExchangeRegistry _exchangeRegistry; + private QueueRegistry _queueRegistry; @Override public void setUp() throws Exception @@ -95,10 +105,57 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); - QueueRegistry queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(queueRegistry); - when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + Exchange exchange = registeredExchange.getValue(); + when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange); + when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange); + return null; + } + }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); + + + + _queueRegistry = mock(QueueRegistry.class); + when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry); + + when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + + final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + AMQQueue queue = registeredQueue.getValue(); + when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue); + when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue); + return null; + } + }).when(_queueRegistry).registerQueue(registeredQueue.capture()); + + /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than + queue creation being on vhost - yuck! */ + SecurityManager securityManager = mock(SecurityManager.class); + when(_vhost.getSecurityManager()).thenReturn(securityManager); + when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(), + any(AMQShortString.class),anyString())).thenReturn(true); + VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class); + when(_vhost.getConfiguration()).thenReturn(configuration); + QueueConfiguration queueConfiguration = mock(QueueConfiguration.class); + when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration); + LogActor logActor = mock(LogActor.class); + CurrentActor.set(logActor); + RootMessageLogger rootLogger = mock(RootMessageLogger.class); + when(logActor.getRootMessageLogger()).thenReturn(rootLogger); + /* end of queue creation mock hackery */ _exchangeFactory = mock(ExchangeFactory.class); @@ -208,15 +265,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase eq(HeadersExchange.TYPE.getType()), anyBoolean(), anyBoolean())).thenReturn(customExchange); - doAnswer(new Answer() - { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange); - return null; - } - }).when(_exchangeRegistry).registerExchange(customExchange); final ConfiguredObjectRecord[] expected = { new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding", @@ -318,6 +366,35 @@ public class DurableConfigurationRecovererTest extends QpidTestCase } + public void testRecoveryOfQueueAlternateExchange() throws Exception + { + + final UUID queueId = new UUID(1, 0); + final UUID exchangeId = new UUID(2, 0); + + + + final Exchange customExchange = mock(Exchange.class); + + when(_exchangeFactory.createExchange(eq(exchangeId), + eq(CUSTOM_EXCHANGE_NAME), + eq(HeadersExchange.TYPE.getType()), + anyBoolean(), + anyBoolean())).thenReturn(customExchange); + + _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2); + + _durableConfigurationRecoverer.configuredObject(queueId, Queue.class.getSimpleName(), + createQueue("testQueue", exchangeId)); + _durableConfigurationRecoverer.configuredObject(exchangeId, + org.apache.qpid.server.model.Exchange.class.getSimpleName(), + createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)); + + _durableConfigurationRecoverer.completeConfigurationRecovery(); + + assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange); + } + private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException { doAnswer(new Answer() @@ -373,4 +450,21 @@ public class DurableConfigurationRecovererTest extends QpidTestCase return exchange; } + + + private Map<String, Object> createQueue(String name, UUID alternateExchangeId) + { + Map<String, Object> queue = new LinkedHashMap<String, Object>(); + + queue.put(Queue.NAME, name); + if(alternateExchangeId != null) + { + queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString()); + } + queue.put(Queue.EXCLUSIVE, false); + + return queue; + + } + } |
