diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-08-18 09:13:02 +0000 |
| commit | ab6fffad2230229810c995253a6f021e42e03aaf (patch) | |
| tree | fdee7a99130750af8d7c71d25c358a282e17e405 /qpid/java/broker/src | |
| parent | 35b5c7fd8c761d41caa88505e8c2fee319e92a84 (diff) | |
| download | qpid-python-ab6fffad2230229810c995253a6f021e42e03aaf.tar.gz | |
QPID-5081 : [Java Broker] Refactor Queue Creation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1515079 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
36 files changed, 1059 insertions, 469 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 53dd6df599..631490ab5f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -167,11 +167,6 @@ public abstract class AbstractExchange implements Exchange return _virtualHost; } - public QueueRegistry getQueueRegistry() - { - return getVirtualHost().getQueueRegistry(); - } - public final boolean isBound(AMQShortString routingKey, FieldTable ft, AMQQueue queue) { return isBound(routingKey == null ? "" : routingKey.asString(), FieldTable.convertToMap(ft), queue); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 2873eb31e8..8e9f980e6b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -47,6 +47,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange { + private final QueueRegistry _queueRegistry; private UUID _id; private VirtualHost _virtualHost; private static final Logger _logger = Logger.getLogger(DefaultExchange.class); @@ -55,6 +56,11 @@ public class DefaultExchange implements Exchange private LogSubject _logSubject; private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>(); + public DefaultExchange(QueueRegistry queueRegistry) + { + _queueRegistry = queueRegistry; + } + @Override public void initialise(UUID id, @@ -82,7 +88,7 @@ public class DefaultExchange implements Exchange @Override public long getBindingCount() { - return _virtualHost.getQueueRegistry().getQueues().size(); + return _virtualHost.getQueues().size(); } @Override @@ -146,7 +152,7 @@ public class DefaultExchange implements Exchange @Override public Binding getBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments) { - if(_virtualHost.getQueueRegistry().getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) + if(_virtualHost.getQueue(bindingKey) == queue && (arguments == null || arguments.isEmpty())) { return convertToBinding(queue); } @@ -207,7 +213,7 @@ public class DefaultExchange implements Exchange @Override public List<AMQQueue> route(InboundMessage message) { - AMQQueue q = _virtualHost.getQueueRegistry().getQueue(message.getRoutingKey()); + AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) { List<AMQQueue> noQueues = Collections.emptyList(); @@ -235,13 +241,13 @@ public class DefaultExchange implements Exchange @Override public boolean isBound(AMQShortString routingKey) { - return _virtualHost.getQueueRegistry().getQueue(routingKey) != null; + return _virtualHost.getQueue(routingKey == null ? null : routingKey.toString()) != null; } @Override public boolean isBound(AMQQueue queue) { - return _virtualHost.getQueueRegistry().getQueue(queue.getName()) == queue; + return _virtualHost.getQueue(queue.getName()) == queue; } @Override @@ -283,7 +289,7 @@ public class DefaultExchange implements Exchange @Override public boolean isBound(String bindingKey) { - return _virtualHost.getQueueRegistry().getQueue(bindingKey) != null; + return _virtualHost.getQueue(bindingKey) != null; } @Override @@ -320,7 +326,7 @@ public class DefaultExchange implements Exchange public Collection<Binding> getBindings() { List<Binding> bindings = new ArrayList<Binding>(); - for(AMQQueue q : _virtualHost.getQueueRegistry().getQueues()) + for(AMQQueue q : _virtualHost.getQueues()) { bindings.add(convertToBinding(q)); } @@ -330,7 +336,7 @@ public class DefaultExchange implements Exchange @Override public void addBindingListener(BindingListener listener) { - _virtualHost.getQueueRegistry().addRegistryChangeListener(convertListener(listener));//To change body of implemented methods use File | Settings | File Templates. + _queueRegistry.addRegistryChangeListener(convertListener(listener)); } private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 75c489c731..d8263a3c80 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -27,6 +27,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -40,20 +41,23 @@ import java.util.concurrent.ConcurrentMap; public class DefaultExchangeRegistry implements ExchangeRegistry { private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class); - /** * Maps from exchange name to exchange instance */ private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>(); private Exchange _defaultExchange; - private VirtualHost _host; + + private final VirtualHost _host; + private final QueueRegistry _queueRegistry; + private final Collection<RegistryChangeListener> _listeners = Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>()); - public DefaultExchangeRegistry(VirtualHost host) + public DefaultExchangeRegistry(VirtualHost host, QueueRegistry queueRegistry) { _host = host; + _queueRegistry = queueRegistry; } public void initialise(ExchangeFactory exchangeFactory) throws AMQException @@ -61,7 +65,7 @@ public class DefaultExchangeRegistry implements ExchangeRegistry //create 'standard' exchanges: new ExchangeInitialiser().initialise(exchangeFactory, this, getDurableConfigurationStore()); - _defaultExchange = new DefaultExchange(); + _defaultExchange = new DefaultExchange(_queueRegistry); UUID defaultExchangeId = UUIDGenerator.generateExchangeUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString(), _host.getName()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java index 6fe0607ab2..ae2031bd71 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -89,6 +89,7 @@ public interface Queue extends ConfiguredObject public static final String EXCLUSIVE = "exclusive"; public static final String MESSAGE_GROUP_KEY = "messageGroupKey"; public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; + public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; public static final String LVQ_KEY = "lvqKey"; public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; public static final String NO_LOCAL = "noLocal"; @@ -100,6 +101,10 @@ public interface Queue extends ConfiguredObject public static final String TYPE = "type"; public static final String PRIORITIES = "priorities"; + public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change + + public static final String FEDERATION_EXCLUDES = "federationExcludes"; + public static final String FEDERATION_ID = "federationId"; public static final Collection<String> AVAILABLE_ATTRIBUTES = @@ -134,6 +139,7 @@ public interface Queue extends ConfiguredObject PRIORITIES )); + //children Collection<Binding> getBindings(); Collection<Consumer> getConsumers(); @@ -144,6 +150,6 @@ public interface Queue extends ConfiguredObject void visit(QueueEntryVisitor visitor); void delete(); - + void setNotificationListener(QueueNotificationListener listener); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java index a84a041b72..26ac99d5bd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -120,7 +120,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH)); - int CURRENT_CONFIG_VERSION = 2; + int CURRENT_CONFIG_VERSION = 3; //children Collection<VirtualHostAlias> getAliases(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 157b97cc07..96a7eacb92 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -66,25 +66,6 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs put(DESCRIPTION, String.class); }}); - static final Map<String, String> ATTRIBUTE_MAPPINGS = new HashMap<String, String>(); - static - { - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_REPEAT_GAP, AMQQueueFactory.X_QPID_MINIMUM_ALERT_REPEAT_GAP); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_AGE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_AGE); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_SIZE); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, AMQQueueFactory.X_QPID_MAXIMUM_MESSAGE_COUNT); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, AMQQueueFactory.X_QPID_MAXIMUM_QUEUE_DEPTH); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, AMQQueueFactory.X_QPID_CAPACITY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, AMQQueueFactory.X_QPID_FLOW_RESUME_CAPACITY); - - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES); - } - private final AMQQueue _queue; private final Map<Binding, BindingAdapter> _bindingAdapters = new HashMap<Binding, BindingAdapter>(); @@ -190,15 +171,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { try { - QueueRegistry queueRegistry = _queue.getVirtualHost().getQueueRegistry(); - synchronized(queueRegistry) - { - _queue.delete(); - if (_queue.isDurable()) - { - DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue); - } - } + _queue.getVirtualHost().removeQueue(_queue); } catch(AMQException e) { @@ -414,13 +387,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs } else if(MESSAGE_GROUP_KEY.equals(name)) { - return _queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY); + return _queue.getAttribute(MESSAGE_GROUP_KEY); } else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) { //We only return the boolean value if message groups are actually in use - return getAttribute(MESSAGE_GROUP_KEY) == null ? null : - SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(_queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + return getAttribute(MESSAGE_GROUP_KEY) == null ? null : _queue.getAttribute(MESSAGE_GROUP_SHARED_GROUPS); } else if(LVQ_KEY.equals(name)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index c09dd9449e..977fd5ae56 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -41,12 +41,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.SystemConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -68,6 +65,7 @@ import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -86,6 +84,7 @@ import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHostListener; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener { @@ -203,7 +202,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual private void populateQueues() { - Collection<AMQQueue> actualQueues = _virtualHost.getQueueRegistry().getQueues(); + Collection<AMQQueue> actualQueues = _virtualHost.getQueues(); if ( actualQueues != null ) { synchronized(_queueAdapters) @@ -399,7 +398,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) { - attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY); + attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); } else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) { @@ -415,7 +414,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { String key = MapValueConverter.getStringAttribute(Queue.MESSAGE_GROUP_KEY, attributes); attributes.remove(Queue.MESSAGE_GROUP_KEY); - attributes.put(SimpleAMQQueue.QPID_GROUP_HEADER_KEY, key); + attributes.put(QueueArgumentsConverter.QPID_GROUP_HEADER_KEY, key); } if (attributes.containsKey(Queue.MESSAGE_GROUP_SHARED_GROUPS)) @@ -423,7 +422,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual if(MapValueConverter.getBooleanAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS, attributes)) { attributes.remove(Queue.MESSAGE_GROUP_SHARED_GROUPS); - attributes.put(SimpleAMQQueue.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + attributes.put(QueueArgumentsConverter.QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); } } @@ -440,15 +439,6 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual attributes.remove(Queue.LIFETIME_POLICY); attributes.remove(Queue.TIME_TO_LIVE); - List<String> attrNames = new ArrayList<String>(attributes.keySet()); - for(String attr : attrNames) - { - if(QueueAdapter.ATTRIBUTE_MAPPINGS.containsKey(attr)) - { - attributes.put(QueueAdapter.ATTRIBUTE_MAPPINGS.get(attr),attributes.remove(attr)); - } - } - return createQueue(name, state, durable, exclusive, lifetime, ttl, attributes); } @@ -472,33 +462,26 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual owner = authenticatedPrincipal.getName(); } } + + final boolean autoDelete = lifetime == LifetimePolicy.AUTO_DELETE; + try { - QueueRegistry queueRegistry = _virtualHost.getQueueRegistry(); - synchronized (queueRegistry) - { - if(_virtualHost.getQueueRegistry().getQueue(name)!=null) - { - throw new IllegalArgumentException("Queue with name "+name+" already exists"); - } - AMQQueue queue = - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, - durable, owner, lifetime == LifetimePolicy.AUTO_DELETE, - exclusive, _virtualHost, attributes); - if(durable) - { - DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), - queue, - FieldTable.convertToFieldTable(attributes)); - } - synchronized (_queueAdapters) - { - return _queueAdapters.get(queue); - } + AMQQueue queue = + _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(name, _virtualHost.getName()), name, + durable, owner, autoDelete, exclusive, autoDelete && exclusive, attributes); + + synchronized (_queueAdapters) + { + return _queueAdapters.get(queue); } } + catch(QueueExistsException qe) + { + throw new IllegalArgumentException("Queue with name "+name+" already exists"); + } catch(AMQException e) { throw new IllegalArgumentException(e); @@ -1057,7 +1040,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if(VirtualHost.QUEUE_COUNT.equals(name)) { - return _vhost.getQueueRegistry().getQueues().size(); + return _vhost.getQueues().size(); } else if(VirtualHost.EXCHANGE_COUNT.equals(name)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4f610cc925..cb6a9249d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -225,7 +225,8 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa void setAlternateExchange(Exchange exchange); - Map<String, Object> getArguments(); + Collection<String> getAvailableAttributes(); + Object getAttribute(String attrName); void checkCapacity(AMQSessionModel channel); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 1eeb6dccf3..5001c2fd2b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -29,42 +29,31 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.VirtualHost; -public class AMQQueueFactory +public class AMQQueueFactory implements QueueFactory { - public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity"; - public static final String X_QPID_CAPACITY = "x-qpid-capacity"; - public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap"; - public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count"; - public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size"; - public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age"; - public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth"; - - public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; - public static final String X_QPID_DESCRIPTION = "x-qpid-description"; - public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; - public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; - public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; - public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key"; + - public static final String DLQ_ROUTING_KEY = "dlq"; - public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; - public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; + public static final String DLQ_ROUTING_KEY = "dlq"; + + private final VirtualHost _virtualHost; + private final QueueRegistry _queueRegistry; - private AMQQueueFactory() + public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry) { + _virtualHost = virtualHost; + _queueRegistry = queueRegistry; } private abstract static class QueueProperty @@ -129,56 +118,56 @@ public class AMQQueueFactory } private static final QueueProperty[] DECLAREABLE_PROPERTIES = { - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE) + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageAge(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE) + new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageSize(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT) + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumMessageCount(value); } }, - new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH) + new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMaximumQueueDepth(value); } }, - new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP) + new QueueLongProperty(Queue.ALERT_REPEAT_GAP) { public void setPropertyValue(AMQQueue queue, long value) { queue.setMinimumAlertRepeatGap(value); } }, - new QueueLongProperty(X_QPID_CAPACITY) + new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setCapacity(value); } }, - new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY) + new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES) { public void setPropertyValue(AMQQueue queue, long value) { queue.setFlowResumeCapacity(value); } }, - new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT) + new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS) { public void setPropertyValue(AMQQueue queue, int value) { @@ -189,13 +178,17 @@ public class AMQQueueFactory /** * @param id the id to use. + * @param deleteOnNoConsumer */ - public static AMQQueue createAMQQueueImpl(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException + @Override + public AMQQueue createAMQQueueImpl(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException { if (id == null) { @@ -206,16 +199,11 @@ public class AMQQueueFactory throw new IllegalArgumentException("Queue name must not be null"); } - // Access check - if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner)) - { - String description = "Permission denied: queue-name '" + queueName + "'"; - throw new AMQSecurityException(description); - } - QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName); - boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration); - if (isDLQEnabled) + QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName); + + boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration); + if (createDLQ) { validateDLNames(queueName); } @@ -226,17 +214,17 @@ public class AMQQueueFactory if(arguments != null) { - if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) + if(arguments.containsKey(Queue.LVQ_KEY)) { - conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY); + conflationKey = (String) arguments.get(Queue.LVQ_KEY); if(conflationKey == null) { - conflationKey = QPID_LVQ_KEY; + conflationKey = QPID_DEFAULT_LVQ_KEY; } } - else if(arguments.containsKey(X_QPID_PRIORITIES)) + else if(arguments.containsKey(Queue.PRIORITIES)) { - Object prioritiesObj = arguments.get(X_QPID_PRIORITIES); + Object prioritiesObj = arguments.get(Queue.PRIORITIES); if(prioritiesObj instanceof Number) { priorities = ((Number)prioritiesObj).intValue(); @@ -257,33 +245,36 @@ public class AMQQueueFactory // TODO - should warn here of invalid format } } - else if(arguments.containsKey(QPID_QUEUE_SORT_KEY)) + else if(arguments.containsKey(Queue.SORT_KEY)) { - sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY); + sortingKey = (String)arguments.get(Queue.SORT_KEY); } } AMQQueue q; if(sortingKey != null) { - q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey); } else if(conflationKey != null) { - q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); + q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); + q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities); } else { - q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); + q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments); } + q.setDeleteOnNoConsumers(deleteOnNoConsumer); + //Register the new queue - virtualHost.getQueueRegistry().registerQueue(q); - q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName)); + _queueRegistry.registerQueue(q); + + q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName)); if(arguments != null) { @@ -294,21 +285,25 @@ public class AMQQueueFactory p.setPropertyValue(q, arguments.get(p.getArgumentName().toString())); } } + + if(arguments.get(Queue.NO_LOCAL) instanceof Boolean) + { + q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL)); + } + } - if(isDLQEnabled) + if(createDLQ) { final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); - final QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - Exchange dlExchange = null; - final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()); + final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName()); try { - dlExchange = virtualHost.createExchange(dlExchangeId, + dlExchange = _virtualHost.createExchange(dlExchangeId, dlExchangeName, ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(), true, false, null); @@ -321,23 +316,19 @@ public class AMQQueueFactory AMQQueue dlQueue = null; - synchronized(queueRegistry) + synchronized(_queueRegistry) { - dlQueue = queueRegistry.getQueue(dlQueueName); + dlQueue = _queueRegistry.getQueue(dlQueueName); if(dlQueue == null) { //set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc final Map<String, Object> args = new HashMap<String, Object>(); - args.put(X_QPID_DLQ_ENABLED, false); - args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); - - dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); + args.put(Queue.CREATE_DLQ_ON_CREATION, false); + args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); - //enter the dlq in the persistent store - DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(), - dlQueue, - FieldTable.convertToFieldTable(args)); + dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive, + false, args); } } @@ -350,11 +341,31 @@ public class AMQQueueFactory } q.setAlternateExchange(dlExchange); } + else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String) + { + + final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE); + Exchange altExchange; + try + { + altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr)); + } + catch(IllegalArgumentException e) + { + altExchange = _virtualHost.getExchange(altExchangeAttr); + } + q.setAlternateExchange(altExchange); + } + + if (q.isDurable() && !q.isAutoDelete()) + { + DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q); + } return q; } - public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException + public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException { String queueName = config.getName(); @@ -365,9 +376,9 @@ public class AMQQueueFactory Map<String, Object> arguments = createQueueArgumentsFromConfig(config); // we need queues that are defined in config to have deterministic ids. - UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName()); + UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName()); - AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments); + AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, false, arguments); q.configure(config); return q; } @@ -414,21 +425,23 @@ public class AMQQueueFactory * queue configuration * @return true if DLQ enabled */ - protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) + protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig) { //feature is not to be enabled for temporary queues or when explicitly disabled by argument - if (!autoDelete) + if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { - boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED); + boolean dlqArgumentPresent = arguments != null + && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION); if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled()) { boolean dlqEnabled = true; if (dlqArgumentPresent) { - Object argument = arguments.get(X_QPID_DLQ_ENABLED); - dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue(); + Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION); + dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) + || (argument instanceof String && Boolean.parseBoolean(argument.toString())); } - return dlqEnabled; + return dlqEnabled ; } } return false; @@ -464,31 +477,30 @@ public class AMQQueueFactory if(config.getArguments() != null && !config.getArguments().isEmpty()) { - arguments.putAll(config.getArguments()); + arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments()))); } if(config.isLVQ() || config.getLVQKey() != null) { - arguments.put(QPID_LAST_VALUE_QUEUE, 1); - arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey()); + arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey()); } else if (config.getPriority() || config.getPriorities() > 0) { - arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); + arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities()); } else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey())) { - arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey()); + arguments.put(Queue.SORT_KEY, config.getQueueSortKey()); } if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled()) { - arguments.put(X_QPID_DLQ_ENABLED, true); + arguments.put(Queue.CREATE_DLQ_ON_CREATION, true); } if (config.getDescription() != null && !"".equals(config.getDescription())) { - arguments.put(X_QPID_DESCRIPTION, config.getDescription()); + arguments.put(Queue.DESCRIPTION, config.getDescription()); } if (arguments.isEmpty()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 27a9e13617..7308433759 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -59,8 +59,9 @@ public class DefaultQueueRegistry implements QueueRegistry } } - public void unregisterQueue(AMQShortString name) + public void unregisterQueue(String nameString) { + AMQShortString name = new AMQShortString(nameString); AMQQueue q = _queueMap.remove(name); if(q != null) { @@ -74,16 +75,11 @@ public class DefaultQueueRegistry implements QueueRegistry } } - public AMQQueue getQueue(AMQShortString name) + private AMQQueue getQueue(AMQShortString name) { return _queueMap.get(name); } - public Collection<AMQShortString> getQueueNames() - { - return _queueMap.keySet(); - } - public Collection<AMQQueue> getQueues() { return _queueMap.values(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java new file mode 100644 index 0000000000..f5bee850c2 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.qpid.server.model.Queue; + +public class QueueArgumentsConverter +{ + public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity"; + public static final String X_QPID_CAPACITY = "x-qpid-capacity"; + public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap"; + public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count"; + public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size"; + public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age"; + public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth"; + + public static final String QPID_ALERT_COUNT = "qpid.alert_count"; + public static final String QPID_ALERT_SIZE = "qpid.alert_size"; + public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap"; + + public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; + + public static final String X_QPID_DESCRIPTION = "x-qpid-description"; + /* public static final String QPID_LVQ_KEY = "qpid.LVQ_key"; + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + */ + public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key"; + + public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key"; + public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled"; + public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count"; + public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; + public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; + public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; + public static final String QPID_TRACE_EXCLUDE = "qpid.trace.exclude"; + public static final String QPID_TRACE_ID = "qpid.trace.id"; + + public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue"; + + /** + * No-local queue argument is used to support the no-local feature of Durable Subscribers. + */ + public static final String QPID_NO_LOCAL = "no-local"; + static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>(); + static + { + ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_AGE, Queue.ALERT_THRESHOLD_MESSAGE_AGE); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_SIZE, Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_MESSAGE_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_QUEUE_DEPTH, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_COUNT, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_SIZE, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + ATTRIBUTE_MAPPINGS.put(QPID_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); + + ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS); + + ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + + ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, Queue.SORT_KEY); + ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, Queue.LVQ_KEY); + ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, Queue.PRIORITIES); + + ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); + + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, Queue.CREATE_DLQ_ON_CREATION); + ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); + //ATTRIBUTE_MAPPINGS.put(QPID_SHARED_MSG_GROUP, Queue.MESSAGE_GROUP_SHARED_GROUPS); + ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP); + ATTRIBUTE_MAPPINGS.put(QPID_TRACE_EXCLUDE, Queue.FEDERATION_EXCLUDES); + ATTRIBUTE_MAPPINGS.put(QPID_TRACE_ID, Queue.FEDERATION_ID); + ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); + + } + + + public static Map<String,Object> convertWireArgsToModel(Map<String,Object> wireArguments) + { + Map<String,Object> modelArguments = new HashMap<String, Object>(); + if(wireArguments != null) + { + for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet()) + { + if(wireArguments.containsKey(entry.getKey())) + { + modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey())); + } + } + if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY)) + { + modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY); + } + if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) + { + modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, + SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(wireArguments.get(QPID_SHARED_MSG_GROUP))); + } + if(wireArguments.get(X_QPID_DLQ_ENABLED) != null) + { + modelArguments.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString())); + } + + if(wireArguments.get(QPID_NO_LOCAL) != null) + { + modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString())); + } + + } + return modelArguments; + } + + + public static Map<String,Object> convertModelArgsToWire(Map<String,Object> modelArguments) + { + Map<String,Object> wireArguments = new HashMap<String, Object>(); + for(Map.Entry<String,String> entry : ATTRIBUTE_MAPPINGS.entrySet()) + { + if(modelArguments.containsKey(entry.getValue())) + { + wireArguments.put(entry.getKey(), modelArguments.get(entry.getValue())); + } + } + + if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + { + wireArguments.put(QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + } + + return wireArguments; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java new file mode 100644 index 0000000000..5411a2bc9c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueFactory.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; + +public interface QueueFactory +{ + AMQQueue createAMQQueueImpl(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQSecurityException, AMQException; +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index e8c34128e9..bc1d5942bd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -32,11 +31,7 @@ public interface QueueRegistry void registerQueue(AMQQueue queue); - void unregisterQueue(AMQShortString name); - - AMQQueue getQueue(AMQShortString name); - - Collection<AMQShortString> getQueueNames(); + void unregisterQueue(String name); Collection<AMQQueue> getQueues(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b0ab93162a..e3dbd62b6c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.actors.QueueActor; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.logging.subjects.QueueLogSubject; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager; @@ -68,12 +69,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; - public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group"; public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; - private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group"; private static final String QPID_NO_GROUP = "qpid.no-group"; private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); + // TODO - should make this configurable at the vhost / broker level private static final int DEFAULT_MAX_GROUPS = 255; @@ -237,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _exclusive = exclusive; _virtualHost = virtualHost; _entries = entryListFactory.createQueueEntryList(this); - _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments); + _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments)); _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); @@ -255,19 +254,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes durable, !durable, _entries.getPriorities() > 0)); - if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) + if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY)) { - if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE)) + if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG); + Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), + new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)), defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), this); } else { - _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS); + _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get( + Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); } } else @@ -358,13 +359,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _alternateExchange = exchange; } - /** - * Arguments used to create this queue. The caller is assured - * that null will never be returned. - */ - public Map<String, Object> getArguments() + + @Override + public Collection<String> getAvailableAttributes() + { + return new ArrayList<String>(_arguments.keySet()); + } + + @Override + public Object getAttribute(String attrName) { - return _arguments; + return _arguments.get(attrName); } public boolean isAutoDelete() @@ -511,7 +516,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _logger.info("Auto-deleteing queue:" + this); } - delete(); + getVirtualHost().removeQueue(this); // we need to manually fire the event to the removed subscription (which was the last one left for this // queue. This is because the delete method uses the subscription set which has just been cleared @@ -1340,7 +1345,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes } } - _virtualHost.getQueueRegistry().unregisterQueue(_name); List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter() { @@ -2282,18 +2286,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { if (description == null) { - _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION); + _arguments.remove(Queue.DESCRIPTION); } else { - _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description); + _arguments.put(Queue.DESCRIPTION, description); } } @Override public String getDescription() { - return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION); + return (String) _arguments.get(Queue.DESCRIPTION); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 931368cb97..960986ec45 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -167,12 +167,12 @@ public class SecurityManager implements ConfigurationChangeListener { String pluginTypeName = getPluginTypeName(accessControl); _hostPlugins.put(pluginTypeName, accessControl); - + if(_logger.isDebugEnabled()) { _logger.debug("Added access control to host plugins with name: " + vhostName); } - + break; } } @@ -366,7 +366,7 @@ public class SecurityManager implements ConfigurationChangeListener } public boolean authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive, - final Boolean nowait, final Boolean passive, final AMQShortString queueName, final String owner) + final Boolean nowait, final Boolean passive, final String queueName, final String owner) { return checkAllPlugins(new AccessCheck() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java index 6c631fc360..893b371d11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java @@ -212,7 +212,7 @@ public class ObjectProperties } public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive, - AMQShortString queueName, String owner) + String queueName, String owner) { super(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java index efb1e95e99..e9181c0e12 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.store; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; public class DurableConfigurationStoreHelper { @@ -46,28 +48,23 @@ public class DurableConfigurationStoreHelper attributesMap.put(Queue.NAME, queue.getName()); attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (queue.getAlternateExchange() != null) { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - else - { - attributesMap.remove(Queue.ALTERNATE_EXCHANGE); - } - if (attributesMap.containsKey(Queue.ARGUMENTS)) - { - // We wouldn't need this if createQueueConfiguredObject took only AMQQueue - Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS); - currentArgs.putAll(queue.getArguments()); - } - else + + Collection<String> availableAttrs = queue.getAvailableAttributes(); + + for(String attrName : availableAttrs) { - attributesMap.put(Queue.ARGUMENTS, queue.getArguments()); + attributesMap.put(attrName, queue.getAttribute(attrName)); } + store.update(queue.getId(), QUEUE, attributesMap); } - public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments) + public static void createQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException { Map<String, Object> attributesMap = new HashMap<String, Object>(); @@ -78,11 +75,9 @@ public class DurableConfigurationStoreHelper { attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId()); } - // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments. - // It would also do away with the need for the if/then/else within updateQueueConfiguredObject - if (arguments != null) + for(String attrName : queue.getAvailableAttributes()) { - attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments)); + attributesMap.put(attrName, queue.getAttribute(attrName)); } store.create(queue.getId(), QUEUE,attributesMap); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4e27a008dd..d87431a415 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -58,11 +59,13 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { @@ -95,6 +98,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; + private final AMQQueueFactory _queueFactory; private volatile State _state = State.INITIALISING; @@ -136,11 +140,14 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); + _queueRegistry = new DefaultQueueRegistry(this); + _queueFactory = new AMQQueueFactory(this, _queueRegistry); + _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeRegistry = new DefaultExchangeRegistry(this); + _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry); initialiseStatistics(); @@ -298,12 +305,12 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this); + AMQQueue queue = _queueFactory.createAMQQueueImpl(queueConfiguration); String queueName = queue.getName(); if (queue.isDurable()) { - DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null); + DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue); } //get the exchange name (returns default exchange name if none was specified) @@ -428,12 +435,102 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } @Override + public AMQQueue getQueue(String name) + { + return _queueRegistry.getQueue(name); + } + + @Override + public AMQQueue getQueue(UUID id) + { + return _queueRegistry.getQueue(id); + } + + @Override + public Collection<AMQQueue> getQueues() + { + return _queueRegistry.getQueues(); + } + + @Override + public int removeQueue(AMQQueue queue) throws AMQException + { + synchronized (getQueueRegistry()) + { + int purged = queue.delete(); + + getQueueRegistry().unregisterQueue(queue.getName()); + if (queue.isDurable() && !queue.isAutoDelete()) + { + DurableConfigurationStore store = getDurableConfigurationStore(); + DurableConfigurationStoreHelper.removeQueue(store, queue); + } + return purged; + } + } + + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException + { + // Access check + if (!getSecurityManager().authoriseCreateQueue(autoDelete, + durable, + exclusive, + null, + null, + queueName, + owner)) + { + String description = "Permission denied: queue-name '" + queueName + "'"; + throw new AMQSecurityException(description); + } + + synchronized (_queueRegistry) + { + if(_queueRegistry.getQueue(queueName) != null) + { + throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName)); + } + if(id == null) + { + + id = UUIDGenerator.generateExchangeUUID(queueName, getName()); + while(_queueRegistry.getQueue(id) != null) + { + id = UUID.randomUUID(); + } + + } + else if(_queueRegistry.getQueue(id) != null) + { + throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName)); + } + return _queueFactory.createAMQQueueImpl(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, + arguments); + } + + } + + @Override public Exchange getExchange(String name) { return _exchangeRegistry.getExchange(name); } @Override + public Exchange getExchange(UUID id) + { + return _exchangeRegistry.getExchange(id); + } + + @Override public Exchange getDefaultExchange() { return _exchangeRegistry.getDefaultExchange(); @@ -747,7 +844,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(this, getExchangeRegistry()), + new QueueRecoverer(this, getExchangeRegistry(), _queueFactory), new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()), new BindingRecoverer(this, getExchangeRegistry()) }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java index 7cfadbcadf..2d3a620e91 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java @@ -91,7 +91,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B { _unresolvedDependencies.add(new ExchangeDependency()); } - _queue = _virtualHost.getQueueRegistry().getQueue(_queueId); + _queue = _virtualHost.getQueue(_queueId); if(_queue == null) { _unresolvedDependencies.add(new QueueDependency()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 3526551073..8d05e719ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; @@ -60,6 +61,9 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader()); case 1: currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); + case 2: + currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); + case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; @@ -213,7 +217,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider UUID queueId = UUID.fromString(queueIdString); ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())) - || _virtualHost.getQueueRegistry().getQueue(queueId) != null); + || _virtualHost.getQueue(queueId) != null); } private boolean isBinding(final String type) @@ -224,4 +228,39 @@ public class DefaultUpgraderProvider implements UpgraderProvider } + /* + * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the + * attributes into the map using the model attribute names rather than the wire attribute names + */ + private class Version2Upgrader extends NonNullUpgrader + { + + private static final String ARGUMENTS = "arguments"; + + @Override + public void configuredObject(UUID id, String type, Map<String, Object> attributes) + { + if(Queue.class.getSimpleName().equals(type)) + { + Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(); + if(attributes.get(ARGUMENTS) instanceof Map) + { + newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) attributes + .get(ARGUMENTS))); + } + newAttributes.putAll(attributes); + attributes = newAttributes; + getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes)); + } + + getNextUpgrader().configuredObject(id,type,attributes); + } + + @Override + public void complete() + { + getNextUpgrader().complete(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java index 7929cd3e39..b4fbdf7544 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java @@ -20,18 +20,19 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.LinkedHashMap; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.UnresolvedDependency; import org.apache.qpid.server.store.UnresolvedObject; @@ -41,11 +42,15 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ private static final Logger _logger = Logger.getLogger(QueueRecoverer.class); private final VirtualHost _virtualHost; private final ExchangeRegistry _exchangeRegistry; + private final QueueFactory _queueFactory; - public QueueRecoverer(final VirtualHost virtualHost, final ExchangeRegistry exchangeRegistry) + public QueueRecoverer(final VirtualHost virtualHost, + final ExchangeRegistry exchangeRegistry, + final QueueFactory queueFactory) { _virtualHost = virtualHost; _exchangeRegistry = exchangeRegistry; + _queueFactory = queueFactory; } @Override @@ -101,26 +106,24 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ String queueName = (String) _attributes.get(Queue.NAME); String owner = (String) _attributes.get(Queue.OWNER); boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE); - @SuppressWarnings("unchecked") - Map<String, Object> queueArgumentsMap = (Map<String, Object>) _attributes.get(Queue.ARGUMENTS); + + Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes); + queueArgumentsMap.remove(Queue.NAME); + queueArgumentsMap.remove(Queue.OWNER); + queueArgumentsMap.remove(Queue.EXCLUSIVE); + try { - _queue = _virtualHost.getQueueRegistry().getQueue(_id); + _queue = _virtualHost.getQueue(_id); if(_queue == null) { - _queue = _virtualHost.getQueueRegistry().getQueue(queueName); + _queue = _virtualHost.getQueue(queueName); } if (_queue == null) { - _queue = AMQQueueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, _virtualHost, - queueArgumentsMap); - _virtualHost.getQueueRegistry().registerQueue(_queue); - - if (_alternateExchange != null) - { - _queue.setAlternateExchange(_alternateExchange); - } + _queue = _queueFactory.createAMQQueueImpl(_id, queueName, true, owner, false, exclusive, + false, queueArgumentsMap); } } catch (AMQException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index e06e785338..2ebbedccd4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -21,15 +21,18 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; import org.apache.qpid.common.Closeable; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; @@ -45,7 +48,23 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable String getName(); - QueueRegistry getQueueRegistry(); + AMQQueue getQueue(String name); + + AMQQueue getQueue(UUID id); + + Collection<AMQQueue> getQueues(); + + int removeQueue(AMQQueue queue) throws AMQException; + + AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException; + Exchange createExchange(UUID id, String exchange, @@ -58,6 +77,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable void removeExchange(Exchange exchange, boolean force) throws AMQException; Exchange getExchange(String name); + Exchange getExchange(UUID id); + Exchange getDefaultExchange(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 3738306f6a..39ca3197b4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHandler implements } for(Transaction.Record record : enqueues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -179,7 +179,7 @@ public class VirtualHostConfigRecoveryHandler implements } for(Transaction.Record record : dequeues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); + final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -268,7 +268,7 @@ public class VirtualHostConfigRecoveryHandler implements public void queueEntry(final UUID queueId, long messageId) { - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); + AMQQueue queue = _virtualHost.getQueue(queueId); try { if(queue != null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java new file mode 100644 index 0000000000..54f7d0d172 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/QueueExistsException.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugins; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.AMQQueue; + +public class QueueExistsException extends AMQException +{ + private final AMQQueue _existing; + + public QueueExistsException(String name, AMQQueue existing) + { + super(name); + _existing = existing; + } + + public AMQQueue getExistingQueue() + { + return _existing; + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index de6d036f29..dc9ddf7b32 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -113,17 +113,17 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check that atest was a priority queue with 5 priorities - AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue atest = vhost.getQueue("atest"); assertTrue(atest instanceof AMQPriorityQueue); assertEquals(5, ((AMQPriorityQueue) atest).getPriorities()); // Check that ptest was a priority queue with 10 priorities - AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest")); + AMQQueue ptest = vhost.getQueue("ptest"); assertTrue(ptest instanceof AMQPriorityQueue); assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities()); // Check that ntest wasn't a priority queue - AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest")); + AMQQueue ntest = vhost.getQueue("ntest"); assertFalse(ntest instanceof AMQPriorityQueue); } @@ -146,13 +146,13 @@ public class VirtualHostConfigurationTest extends QpidTestCase VirtualHost vhost = createVirtualHost(getName()); // Check specifically configured values - AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest")); + AMQQueue aTest = vhost.getQueue("atest"); assertEquals(4, aTest.getMaximumQueueDepth()); assertEquals(5, aTest.getMaximumMessageSize()); assertEquals(6, aTest.getMaximumMessageAge()); // Check default values - AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest")); + AMQQueue bTest = vhost.getQueue("btest"); assertEquals(1, bTest.getMaximumQueueDepth()); assertEquals(2, bTest.getMaximumMessageSize()); assertEquals(3, bTest.getMaximumMessageAge()); @@ -214,10 +214,10 @@ public class VirtualHostConfigurationTest extends QpidTestCase assertFalse("c3p0 queue DLQ was configured as disabled", extra.getConfiguration().getQueueConfiguration("c3p0").isDeadLetterQueueEnabled()); // Get queues - AMQQueue biggles = test.getQueueRegistry().getQueue(new AMQShortString("biggles")); - AMQQueue beetle = test.getQueueRegistry().getQueue(new AMQShortString("beetle")); - AMQQueue r2d2 = extra.getQueueRegistry().getQueue(new AMQShortString("r2d2")); - AMQQueue c3p0 = extra.getQueueRegistry().getQueue(new AMQShortString("c3p0")); + AMQQueue biggles = test.getQueue("biggles"); + AMQQueue beetle = test.getQueue("beetle"); + AMQQueue r2d2 = extra.getQueue("r2d2"); + AMQQueue c3p0 = extra.getQueue("c3p0"); // Disabled specifically for this queue, overriding virtualhost setting assertNull("Biggles queue should not have alt exchange as DLQ should be configured as disabled: " + biggles.getAlternateExchange(), biggles.getAlternateExchange()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index f2a64381df..7adec3d595 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -75,7 +75,8 @@ public class TopicExchangeTest extends QpidTestCase public void testNoRoute() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); @@ -86,7 +87,8 @@ public class TopicExchangeTest extends QpidTestCase public void testDirectMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -108,7 +110,7 @@ public class TopicExchangeTest extends QpidTestCase public void testStarMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null)); @@ -139,7 +141,7 @@ public class TopicExchangeTest extends QpidTestCase public void testHashMatch() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null); _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null)); @@ -190,7 +192,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMidHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null)); routeMessage("a.c.d.b",0l); @@ -215,7 +218,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMatchafterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null)); @@ -253,7 +257,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashAfterHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -274,7 +279,8 @@ public class TopicExchangeTest extends QpidTestCase public void testHashHash() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null)); int queueCount = routeMessage("a.c.b.b.c",0l); @@ -295,7 +301,8 @@ public class TopicExchangeTest extends QpidTestCase public void testSubMatchFails() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null)); int queueCount = routeMessage("a.b.c",0l); @@ -326,7 +333,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreRouting() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); @@ -339,7 +347,8 @@ public class TopicExchangeTest extends QpidTestCase public void testMoreQueue() throws AMQException { - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null); + AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, + false, null); _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null)); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 693fd16b9f..a468fa072b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.Collections; import junit.framework.AssertionFailedError; import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import java.util.ArrayList; +import org.apache.qpid.server.model.Queue; import static org.mockito.Mockito.when; @@ -38,9 +38,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest @Override public void setUp() throws Exception { - FieldTable arguments = new FieldTable(); - arguments.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 3); - setArguments(arguments); + setArguments(Collections.singletonMap(Queue.PRIORITIES,(Object)3)); super.setUp(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index c8e0e53d75..62c9b4c46d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,91 +20,213 @@ */ package org.apache.qpid.server.queue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.commons.configuration.XMLConfiguration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DefaultExchangeFactory; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.plugin.ExchangeType; +import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class AMQQueueFactoryTest extends QpidTestCase { private QueueRegistry _queueRegistry; private VirtualHost _virtualHost; - private Broker _broker; + private AMQQueueFactory _queueFactory; + private List<AMQQueue> _queues; + private QueueConfiguration _queueConfiguration; @Override public void setUp() throws Exception { super.setUp(); - BrokerTestHelper.setUp(); - XMLConfiguration configXml = new XMLConfiguration(); - configXml.addProperty("store.class", TestableMemoryMessageStore.class.getName()); + _queues = new ArrayList<AMQQueue>(); + _virtualHost = mock(VirtualHost.class); - _broker = BrokerTestHelper.createBrokerMock(); - if (getName().equals("testDeadLetterQueueDoesNotInheritDLQorMDCSettings")) - { - when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(5); - when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(true); - } + VirtualHostConfiguration vhostConfig = mock(VirtualHostConfiguration.class); + when(_virtualHost.getConfiguration()).thenReturn(vhostConfig); + _queueConfiguration = mock(QueueConfiguration.class); + when(vhostConfig.getQueueConfiguration(anyString())).thenReturn(_queueConfiguration); + LogActor logActor = mock(LogActor.class); + CurrentActor.set(logActor); + RootMessageLogger rootLogger = mock(RootMessageLogger.class); + when(logActor.getRootMessageLogger()).thenReturn(rootLogger); + DurableConfigurationStore store = mock(DurableConfigurationStore.class); + when(_virtualHost.getDurableConfigurationStore()).thenReturn(store); + + mockExchangeCreation(); + mockQueueRegistry(); + delegateVhostQueueCreation(); + + when(_virtualHost.getQueues()).thenReturn(_queues); + + + _queueFactory = new AMQQueueFactory(_virtualHost, _queueRegistry); - _virtualHost = BrokerTestHelper.createVirtualHost(new VirtualHostConfiguration(getName(), configXml, _broker)); - _queueRegistry = _virtualHost.getQueueRegistry(); } + private void delegateVhostQueueCreation() throws AMQException + { + final ArgumentCaptor<UUID> id = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueName = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> durable = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<String> owner = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Boolean> autoDelete = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> exclusive = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Boolean> deleteOnNoConsumer = ArgumentCaptor.forClass(Boolean.class); + final ArgumentCaptor<Map> arguments = ArgumentCaptor.forClass(Map.class); + + when(_virtualHost.createQueue(id.capture(), queueName.capture(), durable.capture(), owner.capture(), + autoDelete.capture(), exclusive.capture(), deleteOnNoConsumer.capture(), arguments.capture())).then( + new Answer<AMQQueue>() + { + @Override + public AMQQueue answer(InvocationOnMock invocation) throws Throwable + { + return _queueFactory.createAMQQueueImpl(id.getValue(), + queueName.getValue(), + durable.getValue(), + owner.getValue(), + autoDelete.getValue(), + exclusive.getValue(), + deleteOnNoConsumer.getValue(), + arguments.getValue()); + } + } + ); + } + + private void mockQueueRegistry() + { + _queueRegistry = mock(QueueRegistry.class); + + final ArgumentCaptor<AMQQueue> capturedQueue = ArgumentCaptor.forClass(AMQQueue.class); + doAnswer(new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + AMQQueue queue = capturedQueue.getValue(); + when(_queueRegistry.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_queueRegistry.getQueue(eq(queue.getName()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getId()))).thenReturn(queue); + when(_virtualHost.getQueue(eq(queue.getName()))).thenReturn(queue); + _queues.add(queue); + + return null; + } + }).when(_queueRegistry).registerQueue(capturedQueue.capture()); + } + + private void mockExchangeCreation() throws AMQException + { + final ArgumentCaptor<UUID> idCapture = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> exchangeNameCapture = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<String> type = ArgumentCaptor.forClass(String.class); + + when(_virtualHost.createExchange(idCapture.capture(), exchangeNameCapture.capture(), type.capture(), + anyBoolean(), anyBoolean(), anyString())).then( + new Answer<Exchange>() + { + @Override + public Exchange answer(InvocationOnMock invocation) throws Throwable + { + final String name = exchangeNameCapture.getValue(); + final UUID id = idCapture.getValue(); + + final Exchange exchange = mock(Exchange.class); + ExchangeType exType = mock(ExchangeType.class); + + when(exchange.getName()).thenReturn(name); + when(exchange.getId()).thenReturn(id); + when(exchange.getType()).thenReturn(exType); + final String typeName = type.getValue(); + when(exType.getType()).thenReturn(typeName); + when(exType.getName()).thenReturn(new AMQShortString(typeName)); + + when(_virtualHost.getExchange(eq(name))).thenReturn(exchange); + when(_virtualHost.getExchange(eq(id))).thenReturn(exchange); + + final ArgumentCaptor<AMQQueue> queue = ArgumentCaptor.forClass(AMQQueue.class); + + when(exchange.addBinding(anyString(),queue.capture(),anyMap())).then(new Answer<Boolean>() { + + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable + { + when(exchange.isBound(eq(queue.getValue()))).thenReturn(true); + return true; + } + }); + + return exchange; + } + } + ); + } + @Override public void tearDown() throws Exception { - try - { - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } + super.tearDown(); } private void verifyRegisteredQueueCount(int count) { - assertEquals("Queue was not registered in virtualhost", count, _queueRegistry.getQueues().size()); + assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getQueues().size()); } private void verifyQueueRegistered(String queueName) { - assertNotNull("Queue " + queueName + " was not created", _queueRegistry.getQueue(queueName)); + assertNotNull("Queue " + queueName + " was not created", _virtualHost.getQueue(queueName)); } public void testPriorityQueueRegistration() throws Exception { - FieldTable fieldTable = new FieldTable(); - fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.PRIORITIES, (Object) 5); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, - false, _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testPriorityQueue", false, "owner", false, + false, + false, + attributes); assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass()); verifyQueueRegistered("testPriorityQueue"); @@ -115,43 +237,42 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testSimpleQueueRegistration() throws Exception { String queueName = getName(); - AMQShortString dlQueueName = new AMQShortString(queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX); + String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, - false, _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, + false, + false, + null); assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass()); verifyQueueRegistered(queueName); //verify that no alternate exchange or DLQ were produced - QueueRegistry qReg = _virtualHost.getQueueRegistry(); assertNull("Queue should not have an alternate exchange as DLQ wasnt enabled", queue.getAlternateExchange()); - assertNull("The DLQ should not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist", _virtualHost.getQueue(dlQueueName)); verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true does + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does * cause the alternate exchange to be set and DLQ to be produced. * @throws AMQException */ public void testDeadLetterQueueEnabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); Exchange altExchange = queue.getAlternateExchange(); assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange); @@ -161,7 +282,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -178,17 +299,20 @@ public class AMQQueueFactoryTest extends QpidTestCase */ public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception { + String queueName = "testDeadLetterQueueEnabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); + when(_queueConfiguration.getMaxDeliveryCount()).thenReturn(5); + when(_queueConfiguration.isDeadLetterQueueEnabled()).thenReturn(true); - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not yet exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, null); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + null); assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryCount()); Exchange altExchange = queue.getAlternateExchange(); @@ -199,7 +323,7 @@ public class AMQQueueFactoryTest extends QpidTestCase assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getExchange(dlExchangeName)); assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getExchange(dlExchangeName)); - AMQQueue dlQueue = qReg.getQueue(dlQueueName); + AMQQueue dlQueue = _virtualHost.getQueue(dlQueueName); assertNotNull("The DLQ was not registered as expected", dlQueue); assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue)); assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange()); @@ -210,81 +334,77 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument false does not + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not * result in the alternate exchange being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueDisabled() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, false); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) false); String queueName = "testDeadLetterQueueDisabled"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", false, false, + false, + attributes); assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange()); assertNull("The alternate exchange should still not exist", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should still not exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should still not exist", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_DLQ_ENABLED} argument true but + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but * creating an auto-delete queue, does not result in the alternate exchange * being set and DLQ being created. * @throws AMQException */ public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws AMQException { - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues"; String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX; String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX; - QueueRegistry qReg = _virtualHost.getQueueRegistry(); - - assertNull("The DLQ should not yet exist", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName)); assertNull("The alternate exchange should not exist", _virtualHost.getExchange(dlExchangeName)); //create an autodelete queue - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", true, false, + false, + attributes); assertTrue("Queue should be autodelete", queue.isAutoDelete()); //ensure that the autodelete property overrides the request to enable DLQ assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange()); assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getExchange(dlExchangeName)); - assertNull("The DLQ should not exist as queue is autodelete", qReg.getQueue(dlQueueName)); + assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getQueue(dlQueueName)); //only 1 queue should have been registered verifyRegisteredQueueCount(1); } /** - * Tests that setting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has + * Tests that setting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has * the desired effect. */ public void testMaximumDeliveryCount() throws Exception { - final FieldTable fieldTable = new FieldTable(); - fieldTable.setInteger(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, 5); + Map<String,Object> attributes = Collections.singletonMap(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5); - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, FieldTable.convertToMap(fieldTable)); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + attributes); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryCount()); @@ -293,13 +413,14 @@ public class AMQQueueFactoryTest extends QpidTestCase } /** - * Tests that omitting the {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means + * Tests that omitting the {@link QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means * that queue is created with a default maximumDeliveryCount of zero (unless set in config). */ public void testMaximumDeliveryCountDefault() throws Exception { - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, - _virtualHost, null); + final AMQQueue queue = _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "testMaximumDeliveryCount", false, "owner", false, false, + false, + null); assertNotNull("The queue was not registered as expected ", queue); assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryCount()); @@ -314,7 +435,9 @@ public class AMQQueueFactoryTest extends QpidTestCase { try { - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, _virtualHost, null); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, "owner", true, false, + false, + null); fail("queue with null name can not be created!"); } catch (Exception e) @@ -336,10 +459,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLQ name having more than 255 characters can not be created!"); } catch (Exception e) @@ -362,10 +484,9 @@ public class AMQQueueFactoryTest extends QpidTestCase // change DLQ name to make its length bigger than exchange name setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE"); setTestSystemProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ"); - FieldTable fieldTable = new FieldTable(); - fieldTable.setBoolean(AMQQueueFactory.X_QPID_DLQ_ENABLED, true); - AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", - false, false, _virtualHost, FieldTable.convertToMap(fieldTable)); + Map<String,Object> attributes = Collections.singletonMap(Queue.CREATE_DLQ_ON_CREATION, (Object) true); + _queueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, "owner", + false, false, false, attributes); fail("queue with DLE name having more than 255 characters can not be created!"); } catch (Exception e) @@ -379,16 +500,17 @@ public class AMQQueueFactoryTest extends QpidTestCase public void testMessageGroupFromConfig() throws Exception { - PropertiesConfiguration queueConfig = new PropertiesConfiguration(); - queueConfig.addProperty("queues.queue.test.argument", "qpid.group_header_key=mykey"); - queueConfig.addProperty("queues.queue.test.argument", "qpid.shared_msg_group=1"); + Map<String,String> arguments = new HashMap<String, String>(); + arguments.put("qpid.group_header_key","mykey"); + arguments.put("qpid.shared_msg_group","1"); + QueueConfiguration qConf = mock(QueueConfiguration.class); + when(qConf.getArguments()).thenReturn(arguments); + when(qConf.getName()).thenReturn("test"); - final VirtualHostConfiguration vhostConfig = new VirtualHostConfiguration("test", queueConfig, _broker);; - QueueConfiguration qConf = new QueueConfiguration("test", vhostConfig); - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(qConf, _virtualHost); - assertEquals("mykey", queue.getArguments().get(SimpleAMQQueue.QPID_GROUP_HEADER_KEY)); - assertEquals("1", queue.getArguments().get(SimpleAMQQueue.QPID_SHARED_MSG_GROUP)); + AMQQueue queue = _queueFactory.createAMQQueueImpl(qConf); + assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY)); + assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS)); } private String generateStringWithLength(char ch, int length) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index b677ece408..e490db288c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -469,7 +469,14 @@ public class MockAMQQueue implements AMQQueue } - public Map<String, Object> getArguments() + @Override + public Collection<String> getAvailableAttributes() + { + return null; + } + + @Override + public Object getAttribute(String attrName) { return null; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 0faa796f1c..2328745b83 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -29,12 +29,12 @@ import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.util.Map; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessageHeader; @@ -65,7 +65,7 @@ public class SimpleAMQQueueTest extends QpidTestCase private AMQShortString _routingKey = new AMQShortString("routing key"); private DirectExchange _exchange; private MockSubscription _subscription = new MockSubscription(); - private FieldTable _arguments = null; + private Map<String,Object> _arguments = null; private MessagePublishInfo info = new MessagePublishInfo() { @@ -103,8 +103,8 @@ public class SimpleAMQQueueTest extends QpidTestCase _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), - false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), + false, false, false, _arguments); _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString()); } @@ -127,8 +127,11 @@ public class SimpleAMQQueueTest extends QpidTestCase public void testCreateQueue() throws AMQException { _queue.stop(); - try { - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), null, false, _owner.asString(), false, false, _virtualHost, FieldTable.convertToMap(_arguments)); + try + { + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), null, + false, _owner.asString(), false, + false, false, _arguments); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -137,7 +140,8 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("name")); } - try { + try + { _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } @@ -147,8 +151,10 @@ public class SimpleAMQQueueTest extends QpidTestCase e.getMessage().contains("Host")); } - _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), _qname.asString(), false, _owner.asString(), false, - false, _virtualHost, FieldTable.convertToMap(_arguments)); + _queue = (SimpleAMQQueue) _virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), + "differentName", false, + _owner.asString(), false, + false, false, _arguments); assertNotNull("Queue was not created", _queue); } @@ -1225,12 +1231,12 @@ public class SimpleAMQQueueTest extends QpidTestCase return _subscription; } - public FieldTable getArguments() + public Map<String,Object> getArguments() { return _arguments; } - public void setArguments(FieldTable arguments) + public void setArguments(Map<String,Object> arguments) { _arguments = arguments; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java index 4abb7233dc..c115af5a38 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java @@ -50,8 +50,8 @@ public class SimpleAMQQueueThreadPoolTest extends QpidTestCase try { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "test", false, - "owner", false, false, test, null); + SimpleAMQQueue queue = (SimpleAMQQueue) + test.createQueue(UUIDGenerator.generateRandomUUID(), "test", false, "owner", false, false, false, null); assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 67cf0780da..e9ad4ba236 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -52,6 +52,9 @@ import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRec import org.apache.qpid.server.store.Transaction.Record; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase { @@ -178,7 +181,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testBindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -197,7 +200,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUnbindQueue() throws Exception { - AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null); Binding binding = new Binding(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue, _exchange, FieldTable.convertToMap(_bindingArgs)); DurableConfigurationStoreHelper.createBinding(_configStore, binding); @@ -212,8 +215,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueue() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); Map<String, Object> queueAttributes = new HashMap<String, Object>(); @@ -225,13 +228,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testCreateQueueAMQQueueFieldTable() throws Exception { - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -241,7 +243,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); } @@ -250,8 +252,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest { Exchange alternateExchange = createTestAlternateExchange(); - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, null); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); reopenStore(); @@ -275,16 +277,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueExclusivity() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false - queue = createTestQueue(getName(), getName() + "Owner", false); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -295,7 +296,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -304,17 +305,15 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testUpdateQueueAlternateExchange() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // update the queue to have exclusive=false Exchange alternateExchange = createTestAlternateExchange(); - queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange); - when(queue.getArguments()).thenReturn(attributes); + queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes); DurableConfigurationStoreHelper.updateQueue(_configStore, queue); @@ -325,7 +324,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest queueAttributes.put(Queue.NAME, getName()); queueAttributes.put(Queue.OWNER, getName()+"Owner"); queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE); - queueAttributes.put(Queue.ARGUMENTS, attributes); + queueAttributes.putAll(attributes); queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString()); verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes)); @@ -334,12 +333,11 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest public void testRemoveQueue() throws Exception { // create queue - AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); - attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); - FieldTable arguments = FieldTable.convertToFieldTable(attributes); - DurableConfigurationStoreHelper.createQueue(_configStore, queue, arguments); + attributes.put(Queue.CREATE_DLQ_ON_CREATION, Boolean.TRUE); + attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10); + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true, attributes); + DurableConfigurationStoreHelper.createQueue(_configStore, queue); // remove queue DurableConfigurationStoreHelper.removeQueue(_configStore,queue); @@ -349,12 +347,19 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest anyMap()); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + final Map<String, Object> arguments) throws AMQStoreException { - return createTestQueue(queueName, queueOwner, exclusive, null); + return createTestQueue(queueName, queueOwner, exclusive, null, arguments); } - private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive, Exchange alternateExchange) throws AMQStoreException + private AMQQueue createTestQueue(String queueName, + String queueOwner, + boolean exclusive, + Exchange alternateExchange, + final Map<String, Object> arguments) throws AMQStoreException { AMQQueue queue = mock(AMQQueue.class); when(queue.getName()).thenReturn(queueName); @@ -363,6 +368,23 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(queue.isExclusive()).thenReturn(exclusive); when(queue.getId()).thenReturn(_queueId); when(queue.getAlternateExchange()).thenReturn(alternateExchange); + if(arguments != null && !arguments.isEmpty()) + { + when(queue.getAvailableAttributes()).thenReturn(arguments.keySet()); + final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class); + when(queue.getAttribute(requestedAttribute.capture())).then( + new Answer() + { + + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + String attrName = requestedAttribute.getValue(); + return arguments.get(attrName); + } + }); + } + return queue; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index caf6acb4bb..cb1fc2737d 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -181,9 +181,8 @@ public class BrokerTestHelper public static SimpleAMQQueue createQueue(String queueName, VirtualHost virtualHost) throws AMQException { - SimpleAMQQueue queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), queueName, false, null, - false, false, virtualHost, Collections.<String, Object>emptyMap()); - virtualHost.getQueueRegistry().registerQueue(queue); + SimpleAMQQueue queue = (SimpleAMQQueue) virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null, + false, false, false, Collections.<String, Object>emptyMap()); return queue; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 2d3483f078..66a71c562f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -45,6 +45,8 @@ import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.QueueFactory; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; @@ -82,7 +84,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase private DurableConfigurationStore _store; private ExchangeFactory _exchangeFactory; private ExchangeRegistry _exchangeRegistry; - private QueueRegistry _queueRegistry; + private QueueFactory _queueFactory; @Override public void setUp() throws Exception @@ -105,6 +107,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange); when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange); + when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue); final ArgumentCaptor<Exchange> registeredExchange = ArgumentCaptor.forClass(Exchange.class); doAnswer(new Answer() @@ -114,37 +117,68 @@ public class DurableConfigurationRecovererTest extends QpidTestCase public Object answer(final InvocationOnMock invocation) throws Throwable { Exchange exchange = registeredExchange.getValue(); - when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange); - when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange); + when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange); return null; } }).when(_exchangeRegistry).registerExchange(registeredExchange.capture()); - _queueRegistry = mock(QueueRegistry.class); - when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry); + final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class); + final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class); + final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class); - when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue); + _queueFactory = mock(QueueFactory.class); - final ArgumentCaptor<AMQQueue> registeredQueue = ArgumentCaptor.forClass(AMQQueue.class); - doAnswer(new Answer() - { + when(_queueFactory.createAMQQueueImpl(idArg.capture(), queueArg.capture(), + anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then( + new Answer() + { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable - { - AMQQueue queue = registeredQueue.getValue(); - when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue); - when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue); - return null; - } - }).when(_queueRegistry).registerQueue(registeredQueue.capture()); + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + final AMQQueue queue = mock(AMQQueue.class); + + final String queueName = queueArg.getValue(); + final UUID queueId = idArg.getValue(); + + when(queue.getName()).thenReturn(queueName); + when(queue.getId()).thenReturn(queueId); + when(_vhost.getQueue(eq(queueName))).thenReturn(queue); + when(_vhost.getQueue(eq(queueId))).thenReturn(queue); + + final ArgumentCaptor<Exchange> altExchangeArg = ArgumentCaptor.forClass(Exchange.class); + doAnswer( + new Answer() + { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable + { + final Exchange value = altExchangeArg.getValue(); + when(queue.getAlternateExchange()).thenReturn(value); + return null; + } + } + ).when(queue).setAlternateExchange(altExchangeArg.capture()); + + Map args = argsArg.getValue(); + if(args.containsKey(Queue.ALTERNATE_EXCHANGE)) + { + final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString()); + final Exchange exchange = _exchangeRegistry.getExchange(exchangeId); + queue.setAlternateExchange(exchange); + } + return queue; + } + }); _exchangeFactory = mock(ExchangeFactory.class); + DurableConfiguredObjectRecoverer[] recoverers = { - new QueueRecoverer(_vhost, _exchangeRegistry), + new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory), new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory), new BindingRecoverer(_vhost, _exchangeRegistry) }; @@ -356,24 +390,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase final UUID queueId = new UUID(1, 0); final UUID exchangeId = new UUID(2, 0); - /* These lines necessary to get queue creation to work because AMQQueueFactory is called directly rather than - queue creation being on vhost - yuck! */ - SecurityManager securityManager = mock(SecurityManager.class); - when(_vhost.getSecurityManager()).thenReturn(securityManager); - when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(), - any(AMQShortString.class),anyString())).thenReturn(true); - VirtualHostConfiguration configuration = mock(VirtualHostConfiguration.class); - when(_vhost.getConfiguration()).thenReturn(configuration); - QueueConfiguration queueConfiguration = mock(QueueConfiguration.class); - when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration); - LogActor logActor = mock(LogActor.class); - CurrentActor.set(logActor); - RootMessageLogger rootLogger = mock(RootMessageLogger.class); - when(logActor.getRootMessageLogger()).thenReturn(rootLogger); - /* end of queue creation mock hackery */ - final Exchange customExchange = mock(Exchange.class); + when(customExchange.getId()).thenReturn(exchangeId); + when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME); + when(_exchangeFactory.createExchange(eq(exchangeId), eq(CUSTOM_EXCHANGE_NAME), eq(HeadersExchange.TYPE.getType()), @@ -390,7 +411,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _durableConfigurationRecoverer.completeConfigurationRecovery(); - assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), customExchange); + assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange()); } private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws AMQStoreException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 6769c1c2fc..1ca7ff1b65 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.virtualhost; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ScheduledFuture; import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -28,6 +29,7 @@ import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -119,6 +121,43 @@ public class MockVirtualHost implements VirtualHost } @Override + public AMQQueue getQueue(String name) + { + return null; + } + + @Override + public AMQQueue getQueue(UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue> getQueues() + { + return null; + } + + @Override + public int removeQueue(AMQQueue queue) throws AMQException + { + return 0; + } + + @Override + public AMQQueue createQueue(UUID id, + String queueName, + boolean durable, + String owner, + boolean autoDelete, + boolean exclusive, + boolean deleteOnNoConsumer, + Map<String, Object> arguments) throws AMQException + { + return null; + } + + @Override public Exchange createExchange(UUID id, String exchange, String type, @@ -141,6 +180,12 @@ public class MockVirtualHost implements VirtualHost } @Override + public Exchange getExchange(UUID id) + { + return null; + } + + @Override public Exchange getDefaultExchange() { return null; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index e72196c383..03cb483e40 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -244,7 +244,7 @@ public class StandardVirtualHostTest extends QpidTestCase VirtualHost vhost = createVirtualHost(vhostName, config); assertNotNull("virtualhost should exist", vhost); - AMQQueue queue = vhost.getQueueRegistry().getQueue(queueName); + AMQQueue queue = vhost.getQueue(queueName); assertNotNull("queue should exist", queue); Exchange defaultExch = vhost.getDefaultExchange(); |
