summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-08-16 10:22:07 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-08-16 10:22:07 +0000
commite80a585caa90715069cd66e19ce40f0859086149 (patch)
tree7123ff15569b5ff3dd602df0c3b8f03a9e92bc8c /qpid/java
parentccb4bc1e6fc19bb7275164820d2f4a072aea96ee (diff)
downloadqpid-python-e80a585caa90715069cd66e19ce40f0859086149.tar.gz
QPID-5073 : Add dependency on alternate exchange for queues where such an alternate is set
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1514639 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java88
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java118
2 files changed, 167 insertions, 39 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index 4e06cf3202..7929cd3e39 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.Queue;
@@ -62,21 +64,48 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
private class UnresolvedQueue implements UnresolvedObject<AMQQueue>
{
+ private final Map<String, Object> _attributes;
+ private final UUID _alternateExchangeId;
+ private final UUID _id;
private AMQQueue _queue;
+ private List<UnresolvedDependency> _dependencies = new ArrayList<UnresolvedDependency>();
+ private Exchange _alternateExchange;
public UnresolvedQueue(final UUID id,
final String type,
- final Map<String, Object> attributeMap)
+ final Map<String, Object> attributes)
{
- String queueName = (String) attributeMap.get(Queue.NAME);
- String owner = (String) attributeMap.get(Queue.OWNER);
- boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
- UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
+ _attributes = attributes;
+ _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String) _attributes
+ .get(Queue.ALTERNATE_EXCHANGE));
+ _id = id;
+ if (_alternateExchangeId != null)
+ {
+ _alternateExchange = _exchangeRegistry.getExchange(_alternateExchangeId);
+ if(_alternateExchange == null)
+ {
+ _dependencies.add(new AlternateExchangeDependency());
+ }
+ }
+ }
+
+ @Override
+ public UnresolvedDependency[] getUnresolvedDependencies()
+ {
+ return _dependencies.toArray(new UnresolvedDependency[_dependencies.size()]);
+ }
+
+ @Override
+ public AMQQueue resolve()
+ {
+ String queueName = (String) _attributes.get(Queue.NAME);
+ String owner = (String) _attributes.get(Queue.OWNER);
+ boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
@SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
+ Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS);
try
{
- _queue = _virtualHost.getQueueRegistry().getQueue(id);
+ _queue = _virtualHost.getQueueRegistry().getQueue(_id);
if(_queue == null)
{
_queue = _virtualHost.getQueueRegistry().getQueue(queueName);
@@ -84,38 +113,43 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
if (_queue == null)
{
- _queue = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
- queueArgumentsMap);
+ _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost,
+ queueArgumentsMap);
_virtualHost.getQueueRegistry().registerQueue(_queue);
- if (alternateExchangeId != null)
+ if (_alternateExchange != null)
{
- Exchange altExchange = _exchangeRegistry.getExchange(alternateExchangeId);
- if (altExchange == null)
- {
- _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
- return;
- }
- _queue.setAlternateExchange(altExchange);
+ _queue.setAlternateExchange(_alternateExchange);
}
}
}
catch (AMQException e)
{
- throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
+ throw new RuntimeException("Error recovering queue uuid " + _id + " name " + queueName, e);
}
+ return _queue;
}
- @Override
- public UnresolvedDependency[] getUnresolvedDependencies()
+ private class AlternateExchangeDependency implements UnresolvedDependency
{
- return new UnresolvedDependency[0];
- }
+ @Override
+ public UUID getId()
+ {
+ return _alternateExchangeId;
+ }
- @Override
- public AMQQueue resolve()
- {
- return _queue;
+ @Override
+ public String getType()
+ {
+ return "Exchange";
+ }
+
+ @Override
+ public void resolve(final Object dependency)
+ {
+ _alternateExchange = (Exchange) dependency;
+ _dependencies.remove(this);
+ }
}
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index ccc7f6a697..6a97eb38cb 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -28,7 +28,10 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -36,21 +39,27 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -73,6 +82,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private DurableConfigurationStore _store;
private ExchangeFactory _exchangeFactory;
private ExchangeRegistry _exchangeRegistry;
+ private QueueRegistry _queueRegistry;
@Override
public void setUp() throws Exception
@@ -95,10 +105,57 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
- QueueRegistry queueRegistry = mock(QueueRegistry.class);
- when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
- when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+ final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class);
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ Exchange exchange = registeredExchange.getValue();
+ when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange);
+ when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange);
+ return null;
+ }
+ }).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
+
+
+
+ _queueRegistry = mock(QueueRegistry.class);
+ when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry);
+
+ when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+
+ final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class);
+ doAnswer(new Answer()
+ {
+
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ AMQQueue queue = registeredQueue.getValue();
+ when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue);
+ when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue);
+ return null;
+ }
+ }).when(_queueRegistry).registerQueue(registeredQueue.capture());
+
+ /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than
+ queue creation being on vhost - yuck! */
+ SecurityManager securityManager = mock(SecurityManager.class);
+ when(_vhost.getSecurityManager()).thenReturn(securityManager);
+ when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),
+ any(AMQShortString.class),anyString())).thenReturn(true);
+ VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class);
+ when(_vhost.getConfiguration()).thenReturn(configuration);
+ QueueConfiguration queueConfiguration = mock(QueueConfiguration.class);
+ when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration);
+ LogActor logActor = mock(LogActor.class);
+ CurrentActor.set(logActor);
+ RootMessageLogger rootLogger = mock(RootMessageLogger.class);
+ when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
+ /* end of queue creation mock hackery */
_exchangeFactory = mock(ExchangeFactory.class);
@@ -208,15 +265,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
eq(HeadersExchange.TYPE.getType()),
anyBoolean(),
anyBoolean())).thenReturn(customExchange);
- doAnswer(new Answer()
- {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable
- {
- when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange);
- return null;
- }
- }).when(_exchangeRegistry).registerExchange(customExchange);
final ConfiguredObjectRecord[] expected = {
new ConfiguredObjectRecord(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
@@ -318,6 +366,35 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
+ public void testRecoveryOfQueueAlternateExchange() throws Exception
+ {
+
+ final UUID queueId = new UUID(1, 0);
+ final UUID exchangeId = new UUID(2, 0);
+
+
+
+ final Exchange customExchange = mock(Exchange.class);
+
+ when(_exchangeFactory.createExchange(eq(exchangeId),
+ eq(CUSTOM_EXCHANGE_NAME),
+ eq(HeadersExchange.TYPE.getType()),
+ anyBoolean(),
+ anyBoolean())).thenReturn(customExchange);
+
+ _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+ _durableConfigurationRecoverer.configuredObject(queueId, Queue.class.getSimpleName(),
+ createQueue("testQueue", exchangeId));
+ _durableConfigurationRecoverer.configuredObject(exchangeId,
+ org.apache.qpid.server.model.Exchange.class.getSimpleName(),
+ createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+
+ _durableConfigurationRecoverer.completeConfigurationRecovery();
+
+ assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange);
+ }
+
private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException
{
doAnswer(new Answer()
@@ -373,4 +450,21 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
return exchange;
}
+
+
+ private Map<String, Object> createQueue(String name, UUID alternateExchangeId)
+ {
+ Map<String, Object> queue = new LinkedHashMap<String, Object>();
+
+ queue.put(Queue.NAME, name);
+ if(alternateExchangeId != null)
+ {
+ queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString());
+ }
+ queue.put(Queue.EXCLUSIVE, false);
+
+ return queue;
+
+ }
+
}